Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot persist dask.dataframes #4492

Open
dantegd opened this issue Feb 8, 2021 · 15 comments
Open

Cannot persist dask.dataframes #4492

dantegd opened this issue Feb 8, 2021 · 15 comments

Comments

@dantegd
Copy link

dantegd commented Feb 8, 2021

What happened:

DataFrame collections like dask dataframes or dask-cudf cannot be persisted after release 2021.2.0. @wphicks triaged that after the merge of this PR the issue started to present: #4406

What you expected to happen:

Persist to work (see reproducer)

Minimal Complete Verifiable Example:

import dask.dataframe as dd
import numpy as np
import pandas as pd

from dask.distributed import Client
from dask.distributed import LocalCluster


def persist_across_workers(client, objects, workers=None):
    if workers is None:
        # Default to all workers
        workers = client.has_what().keys()
    return client.persist(objects, workers={o: workers for o in objects})


if __name__ == "__main__":

    cluster = LocalCluster()
    client = Client(cluster)

    X = np.ones((10000, 20))

    X_df = pd.DataFrame(X)
    X_dist = dd.from_pandas(X_df, npartitions=2)

    X_f = persist_across_workers(client, X_dist)

Output:

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/protocol/core.py", line 39, in dumps
    small_header, small_payload = dumps_msgpack(msg, **compress_opts)
  File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/protocol/core.py", line 184, in dumps_msgpack
    payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
  File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'dict_keys' object
distributed.comm.utils - ERROR - can not serialize 'dict_keys' object
Traceback (most recent call last):
  File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/comm/utils.py", line 32, in _to_frames
    protocol.dumps(
  File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/protocol/core.py", line 39, in dumps
    small_header, small_payload = dumps_msgpack(msg, **compress_opts)
  File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/protocol/core.py", line 184, in dumps_msgpack
    payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
  File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'dict_keys' object
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/batched.py", line 93, in _background_send
    nbytes = yield self.comm.write(
  File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/comm/tcp.py", line 230, in write
    frames = await to_frames(
  File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/comm/utils.py", line 52, in to_frames
    return _to_frames()
  File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/comm/utils.py", line 32, in _to_frames
    protocol.dumps(
  File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/protocol/core.py", line 39, in dumps
    small_header, small_payload = dumps_msgpack(msg, **compress_opts)
  File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/distributed/protocol/core.py", line 184, in dumps_msgpack
    payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
  File "/home/galahad/miniconda3/envs/ns0208/lib/python3.8/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 292, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 298, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 295, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 289, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'dict_keys' object

Environment:

  • Dask version: 2021.2.0
  • Distributed version: 2021.2.0 from conda and built from master after Unify annotations #4406
  • Python version: 3.7 and 3.8
  • Operating System: Linux / AMD64
  • Install method (conda, pip, source): conda and from source

cc @jakirkham @pentschev @madsbk @wphicks

@jakirkham
Copy link
Member

cc @ian-r-rose

@jrbourbeau
Copy link
Member

jrbourbeau commented Feb 8, 2021

Thanks for the nice example @dantegd. In #4406 we removed the ability to pass Dask collections to priority=, workers= keywords as they were broken in many cases. Instead you can now use Dask's new dask.annotate machinery in these cases, which should hopefully be more robust.

For the above example, this means changing

    client.persist(objects, workers={o: workers for o in objects})

to

    with dask.annotate(workers=set(workers)):
        return client.persist(objects)
Details:
import dask.dataframe as dd
import numpy as np
import pandas as pd

import dask
from dask.distributed import Client
from dask.distributed import LocalCluster


def persist_across_workers(client, objects, workers=None):
    if workers is None:
        # Default to all workers
        workers = client.has_what().keys()
    with dask.annotate(workers=set(workers)):
        return client.persist(objects)


if __name__ == "__main__":

    cluster = LocalCluster()
    client = Client(cluster)

    X = np.ones((10000, 20))

    X_df = pd.DataFrame(X)
    X_dist = dd.from_pandas(X_df, npartitions=2)

    X_f = persist_across_workers(client, X_dist)

@jakirkham
Copy link
Member

Should we improve the error message here then? Maybe drop this flag? Something else? As it is, this is not obviously unsupported based on the error message given (MsgPack unable to serialize something)

@ian-r-rose
Copy link
Collaborator

Yes, that's right @jrbourbeau .

@jakirkham I agree that an improved error message would be helpful. At the very least, we could do a better job ensuring that the shape of the priority/workers/etc makes sense (i.e., iterable for workers, number for priority, error if dict-of-collections)

@quasiben
Copy link
Member

quasiben commented Feb 8, 2021

@trivialfis you might be interested in this -- I think xgboost maybe does similar things ?

@jakirkham
Copy link
Member

cc @hcho3 (from xgboost as well)

@jrbourbeau
Copy link
Member

jrbourbeau commented Feb 8, 2021

I think xgboost maybe does similar things ?

Good to know. Briefly looking through xgboost, they specify individual worker addresses in workers= without any Dask collections, so that shouldn't be impacted by any of the recent changes

@wphicks
Copy link

wphicks commented Feb 9, 2021

@hcho3 is out for a bit, but it sounds like we're good on the xgboost front? @trivialfis, please feel free to hit me up if you need extra hands/eyes on that.

@quasiben
Copy link
Member

quasiben commented Feb 9, 2021

Good to know. Briefly looking through xgboost, they specify individual worker addresses in workers= without any Dask collections, so that shouldn't be impacted by any of the recent changes

Thanks for checking @jrbourbeau

rapids-bot bot pushed a commit to rapidsai/cuml that referenced this issue Feb 9, 2021
Fixes the current CI issue dask/distributed#4492

Authors:
  - Dante Gama Dessavre (@dantegd)
  - Michael Demoret (@mdemoret-nv)

Approvers:
  - John Zedlewski (@JohnZed)
  - William Hicks (@wphicks)
  - @jakirkham
  - Corey J. Nolet (@cjnolet)

URL: #3474
@jakirkham
Copy link
Member

Just checking in here, was there anything else we still need to do or is this ok to close now?

@jrbourbeau
Copy link
Member

It was proposed we could improve the error message which is raised (#4492 (comment)). But otherwise I think things have already been fixed upstream in dask-cudf

@trivialfis
Copy link

I'm sorry for completely missing this thread until I tried to dig up some old emails...

Good to know. Briefly looking through xgboost, they specify individual worker addresses in workers= without any Dask collections, so that shouldn't be impacted by any of the recent changes

We do have a line that might be related https://github.com/dmlc/xgboost/blob/905fdd3e08d91077aada776346c7e49e4ff69334/python-package/xgboost/dask.py#L335 , copying it to here:

data = client.persist(data)

But we just use the default value for workers. So should be safe?

@jakirkham
Copy link
Member

If you don't need the workers to be specified, would not specify them

@jrbourbeau
Copy link
Member

Yeah, as John mentioned, you should be fine since you're not explicitly specifying a set of workers

@trivialfis
Copy link

@jakirkham @jrbourbeau Thanks for the advice!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants