Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raise CommClosedError() distributed.comm.core.CommClosedError with tornado.iostream.StreamClosedError: Stream is closedz #4103

Open
agniszczotka opened this issue Sep 10, 2020 · 3 comments

Comments

@agniszczotka
Copy link

What happened: When creating large data frames and saving to parquet there is an error:

Task exception was never retrieved
future: <Task finished coro=<connect.._() done, defined at /Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/comm/core.py:279> exception=CommClosedError()>
Traceback (most recent call last):
File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/comm/core.py", line 288, in _
handshake = await asyncio.wait_for(comm.read(), 1)
File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/asyncio/tasks.py", line 351, in wait_for
yield from waiter
concurrent.futures._base.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/comm/core.py", line 295, in _
raise CommClosedError()
distributed.comm.core.CommClosedError
distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:57234
Traceback (most recent call last):
File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/comm/tcp.py", line 186, in read
n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/worker.py", line 1983, in gather_dep
self.rpc, deps, worker, who=self.address
File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/worker.py", line 3258, in get_data_from_worker
return await retry_operation(_get_data, operation="get_data_from_worker")
File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/utils_comm.py", line 390, in retry_operation
operation=operation,
File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/utils_comm.py", line 370, in retry
return await coro()
File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/worker.py", line 3245, in _get_data
max_connections=max_connections,
File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/core.py", line 666, in send_recv
response = await comm.read(deserializers=deserializers)
File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/comm/tcp.py", line 201, in read
convert_stream_closed_error(self, e)
File "/Users/agnieszkaszczotka/.pyenv/versions/anaconda3-2019.10/envs/bayesian/lib/python3.6/site-packages/distributed/comm/tcp.py", line 125, in convert_stream_closed_error
raise CommClosedError("in %s: %s" % (obj, exc)) from exc
distributed.comm.core.CommClosedError: in : Stream is closed

What you expected to happen: no error

Minimal Complete Verifiable Example:

    from dask.distributed import Client
    from scipy import sparse
    import numpy as np
    import pandas as pd
    import dask.dataframe as dd

    client = Client()
    n = 10000
    columns1 = np.arange(n).astype(str)
    columns2 = np.arange(start=1000, stop=n + 1000).astype(str)

    df1 = pd.DataFrame.sparse.from_spmatrix(sparse.eye(n), index=columns1, columns=columns1)  # Create a pandas df
    df2 = pd.DataFrame.sparse.from_spmatrix(sparse.eye(n), index=columns2, columns=columns2)  # Create a pandas df
    ddf1 = dd.from_pandas(df1, chunksize=1000, sort=True)  # Create a dask.dataframe with known divisions
    ddf2 = dd.from_pandas(df2, chunksize=1000, sort=True)  # Create a dask.dataframe with known divisions

    ddfs = ddf1.add(ddf2)
    ddfs = ddfs.map_partitions(lambda x: x.sparse.to_dense().fillna(0.0))
    print(ddfs.head())
    ddfs.to_parquet(path='save', compute=True)

Anything else we need to know?: the variable n is setting the size of dataframe, when n is small e.g 100 the error does not show up.

Environment:

  • Dask version: 2.25.0
  • Python version: 3.6 anaconda3-2019.10
  • Operating System: Mac Catalina 10.15.6
  • Install method (conda, pip, source): conda
@quasiben
Copy link
Member

Thanks for the report @agniszczotka . When I ran I got a different error:

distributed.worker - WARNING -  Compute Failed
Function:  execute_task
args:      ((<function apply at 0x7ff1f6bd40d0>, <function ArrowEngine.write_partition at 0x7ff17fb1a430>, [                       0    1   10  100  1000  10000  10001  10002  10003  10004  10005  10006  10007  10008  10009  1001  10010  10011  10012  10013  ...  9981  9982  9983  9984  9985  9986  9987  9988  9989  999  9990  9991  9992  9993  9994  9995  9996  9997  9998  9999
__null_dask_index__                                                                                                                                    ...
8199                 0.0  0.0  0.0  0.0   0.0    0.0    0.0    0.0    0.0    0.0    0.0    0.0    0.0    0.0    0.0   0.0    0.0    0.0    0.0    0.0  ...   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0  0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0   0.0
82                   0.0  0.0  0.0  0.0   0.0    0.0    0.0    0.0    0.0    0.0
kwargs:    {}
Exception: ArrowTypeError('Did not pass numpy.dtype object', 'Conversion failed for column 0 with type float64')

I probably need to upgrade arrow to 1.X (I currently have 0.17 installed. What are version of arrow are you using ? or if not arrow, fastparquet ?

@quasiben
Copy link
Member

After upgrading arrow i was not able to reproduce on Linux:

In [10]: ddfs.to_parquet(path='save', compute=True)

In [11]: import dask, distributed, pyarrow

In [12]: dask.__version__, distributed.__version__, pyarrow.__version__
Out[12]: ('2.25.0', '2.25.0', '1.0.1')

@agniszczotka
Copy link
Author

I do use fastparquet. could you check with fastparquet?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants