Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally use pickle5 #3849

Merged
merged 9 commits into from
Jul 10, 2020
Merged

Optionally use pickle5 #3849

merged 9 commits into from
Jul 10, 2020

Conversation

jakirkham
Copy link
Member

@jakirkham jakirkham commented Jun 2, 2020

Fixes #2495

This extends the changes in PR ( #3784 ) to also support using pickle5 (when available). Depends on PR ( cloudpipe/cloudpickle#370 ), which is in cloudpickle version 1.5.0.

cc @pierreglaser @pitrou

@jakirkham
Copy link
Member Author

jakirkham commented Jun 10, 2020

Am seeing this failure. After debugging this more closely, it appears we need this change ( cloudpipe/cloudpickle#368 (review) ).

__________________________ test_feed_large_bytestring __________________________
1563
1564    def test_func():
1565        result = None
1566        workers = []
1567        with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
1568    
1569            async def coro():
1570                with dask.config.set(config):
1571                    s = False
1572                    for i in range(5):
1573                        try:
1574                            s, ws = await start_cluster(
1575                                nthreads,
1576                                scheduler,
1577                                loop,
1578                                security=security,
1579                                Worker=Worker,
1580                                scheduler_kwargs=scheduler_kwargs,
1581                                worker_kwargs=worker_kwargs,
1582                            )
1583                        except Exception as e:
1584                            logger.error(
1585                                "Failed to start gen_cluster, retrying",
1586                                exc_info=True,
1587                            )
1588                            await asyncio.sleep(1)
1589                        else:
1590                            workers[:] = ws
1591                            args = [s] + workers
1592                            break
1593                    if s is False:
1594                        raise Exception("Could not start cluster")
1595                    if client:
1596                        c = await Client(
1597                            s.address,
1598                            loop=loop,
1599                            security=security,
1600                            asynchronous=True,
1601                            **client_kwargs,
1602                        )
1603                        args = [c] + args
1604                    try:
1605                        future = func(*args)
1606                        if timeout:
1607                            future = asyncio.wait_for(future, timeout)
1608                        result = await future
1609                        if s.validate:
1610                            s.validate_state()
1611                    finally:
1612                        if client and c.status not in ("closing", "closed"):
1613                            await c._close(fast=s.status == "closed")
1614                        await end_cluster(s, workers)
1615                        await asyncio.wait_for(cleanup_global_workers(), 1)
1616    
1617                    try:
1618                        c = await default_client()
1619                    except ValueError:
1620                        pass
1621                    else:
1622                        await c._close(fast=True)
1623    
1624                    def get_unclosed():
1625                        return [c for c in Comm._instances if not c.closed()] + [
1626                            c
1627                            for c in _global_clients.values()
1628                            if c.status != "closed"
1629                        ]
1630    
1631                    try:
1632                        start = time()
1633                        while time() < start + 5:
1634                            gc.collect()
1635                            if not get_unclosed():
1636                                break
1637                            await asyncio.sleep(0.05)
1638                        else:
1639                            if allow_unclosed:
1640                                print(f"Unclosed Comms: {get_unclosed()}")
1641                            else:
1642                                raise RuntimeError("Unclosed Comms", get_unclosed())
1643                    finally:
1644                        Comm._instances.clear()
1645                        _global_clients.clear()
1646    
1647                    return result
1648    
1649            result = loop.run_sync(
1650>               coro, timeout=timeout * 2 if timeout else timeout
1651            )
1652
1653distributed/utils_test.py:954: 
1654_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
1655../../../miniconda/envs/dask-distributed/lib/python3.6/site-packages/tornado/ioloop.py:576: in run_sync
1656    return future_cell[0].result()
1657distributed/utils_test.py:912: in coro
1658    result = await future
1659../../../miniconda/envs/dask-distributed/lib/python3.6/asyncio/tasks.py:358: in wait_for
1660    return fut.result()
1661_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
1662
1663s = <Scheduler: "tcp://127.0.0.1:42742" processes: 0 cores: 0>
1664a = <Worker: 'tcp://127.0.0.1:33191', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
1665b = <Worker: 'tcp://127.0.0.1:37948', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
1666
1667    @gen_cluster()
1668    async def test_feed_large_bytestring(s, a, b):
1669        np = pytest.importorskip("numpy")
1670    
1671        x = np.ones(10000000)
1672    
1673        def func(scheduler):
1674            y = x
1675            return True
1676    
1677        comm = await connect(s.address)
1678        await comm.write({"op": "feed", "function": dumps(func), "interval": 0.05})
1679    
1680        for i in range(5):
1681            response = await comm.read()
1682>           assert response is True
1683E           AssertionError: assert {'exception': ValueError('The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()'...ay with more than one element is ambiguous. Use a.any() or a.all()', 'traceback': <traceback object at 0x7f0a35b87cc8>} is True
1684
1685distributed/tests/test_scheduler.py:377: AssertionError

Edit: Fixed upstream.

@jakirkham
Copy link
Member Author

jakirkham commented Jun 11, 2020

Am seeing this failure (snippet below). Think it has something to do with how the dispatch_table is being used in cloudpickle. Discussion can be found in thread ( cloudpipe/cloudpickle#368 (review) ).

__________________________ test_chained_error_message __________________________

c = <Client: not connected>
s = <Scheduler: "tcp://127.0.0.1:46028" processes: 0 cores: 0>
a = <Worker: 'tcp://127.0.0.1:44402', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:45385', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_chained_error_message(c, s, a, b):
        def chained_exception_fn():
            class MyException(Exception):
                def __init__(self, msg):
                    self.msg = msg
    
                def __str__(self):
                    return "MyException(%s)" % self.msg
    
            exception = MyException("Foo")
            inner_exception = MyException("Bar")
    
            try:
                raise inner_exception
            except Exception as e:
                raise exception from e
    
        f = c.submit(chained_exception_fn)
        try:
>           await f

distributed/tests/test_worker.py:406: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: chained_exception_fn-103274397419cf5bcfd188fdd51fd53a>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
>               raise exc.with_traceback(tb)

distributed/client.py:235: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   raise exception from e
E   test_worker.test_chained_error_message.<locals>.chained_exception_fn.<locals>.MyException: MyException(Foo)

distributed/tests/test_worker.py:402: MyException

During handling of the above exception, another exception occurred:

    def test_func():
        result = None
        workers = []
        with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
    
            async def coro():
                with dask.config.set(config):
                    s = False
                    for i in range(5):
                        try:
                            s, ws = await start_cluster(
                                nthreads,
                                scheduler,
                                loop,
                                security=security,
                                Worker=Worker,
                                scheduler_kwargs=scheduler_kwargs,
                                worker_kwargs=worker_kwargs,
                            )
                        except Exception as e:
                            logger.error(
                                "Failed to start gen_cluster, retrying",
                                exc_info=True,
                            )
                            await asyncio.sleep(1)
                        else:
                            workers[:] = ws
                            args = [s] + workers
                            break
                    if s is False:
                        raise Exception("Could not start cluster")
                    if client:
                        c = await Client(
                            s.address,
                            loop=loop,
                            security=security,
                            asynchronous=True,
                            **client_kwargs,
                        )
                        args = [c] + args
                    try:
                        future = func(*args)
                        if timeout:
                            future = asyncio.wait_for(future, timeout)
                        result = await future
                        if s.validate:
                            s.validate_state()
                    finally:
                        if client and c.status not in ("closing", "closed"):
                            await c._close(fast=s.status == "closed")
                        await end_cluster(s, workers)
                        await asyncio.wait_for(cleanup_global_workers(), 1)
    
                    try:
                        c = await default_client()
                    except ValueError:
                        pass
                    else:
                        await c._close(fast=True)
    
                    def get_unclosed():
                        return [c for c in Comm._instances if not c.closed()] + [
                            c
                            for c in _global_clients.values()
                            if c.status != "closed"
                        ]
    
                    try:
                        start = time()
                        while time() < start + 5:
                            gc.collect()
                            if not get_unclosed():
                                break
                            await asyncio.sleep(0.05)
                        else:
                            if allow_unclosed:
                                print(f"Unclosed Comms: {get_unclosed()}")
                            else:
                                raise RuntimeError("Unclosed Comms", get_unclosed())
                    finally:
                        Comm._instances.clear()
                        _global_clients.clear()
    
                    return result
    
            result = loop.run_sync(
>               coro, timeout=timeout * 2 if timeout else timeout
            )

distributed/utils_test.py:954: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../miniconda/envs/dask-distributed/lib/python3.7/site-packages/tornado/ioloop.py:532: in run_sync
    return future_cell[0].result()
distributed/utils_test.py:912: in coro
    result = await future
../../../miniconda/envs/dask-distributed/lib/python3.7/asyncio/tasks.py:442: in wait_for
    return fut.result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

c = <Client: not connected>
s = <Scheduler: "tcp://127.0.0.1:46028" processes: 0 cores: 0>
a = <Worker: 'tcp://127.0.0.1:44402', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:45385', 1, Status.closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_chained_error_message(c, s, a, b):
        def chained_exception_fn():
            class MyException(Exception):
                def __init__(self, msg):
                    self.msg = msg
    
                def __str__(self):
                    return "MyException(%s)" % self.msg
    
            exception = MyException("Foo")
            inner_exception = MyException("Bar")
    
            try:
                raise inner_exception
            except Exception as e:
                raise exception from e
    
        f = c.submit(chained_exception_fn)
        try:
            await f
        except Exception as e:
>           assert e.__cause__ is not None
E           AssertionError: assert None is not None
E            +  where None = MyException('Foo').__cause__

distributed/tests/test_worker.py:408: AssertionError

Edit: Should add this is the only failure I'm seeing in the full test suite.

@jakirkham
Copy link
Member Author

We now have all tests passing using the development version of cloudpickle 😄

@jakirkham
Copy link
Member Author

@fjetter, if you have time/interest, it might be worth trying this out. Would need to install pickle5 and cloudpickle, based off of PR ( cloudpipe/cloudpickle#370 ). If you do try this, would be interested in hearing how things go for you 🙂

@jakirkham jakirkham changed the title WIP, RFC: Optionally use pickle5 Optionally use pickle5 Jul 1, 2020
@jakirkham jakirkham marked this pull request as ready for review July 1, 2020 18:10
Drop the `cloudpickle` hack as we now have a releae with our needs met,
which we included in our requirements. Also ensure we have the latest
`pickle5` with fixes for testing.
@jakirkham
Copy link
Member Author

cc-ing @jcrist (in case you are interested 😉)

@quasiben
Copy link
Member

quasiben commented Jul 2, 2020

Generally, this looks good. I spoke briefly with @jakirkham about this and thought I should bring it up here as well. Do others have issues with bumping cloudpickle version. Are there dask subprojects that have a pinned cloudpickle version for a particular reason ?

@mrocklin
Copy link
Member

mrocklin commented Jul 2, 2020 via email

@jakirkham
Copy link
Member Author

That's a fair concern. We could try to add some conditional logic here to handle an older cloudpickle. Alternatively we could just wait on merging this until after today's release ships, which would give us a 2 week gap between the cloudpickle release and this change.

@mrocklin
Copy link
Member

mrocklin commented Jul 2, 2020 via email

@jakirkham
Copy link
Member Author

2.20.0 is now out. So I think we should be good here. Please let me know if we need anything else 🙂

@mrocklin
Copy link
Member

mrocklin commented Jul 6, 2020

How confident are we that defaults will pick this up by the next release cycle?

@jakirkham
Copy link
Member Author

Sorry pick-up what?

@mrocklin
Copy link
Member

mrocklin commented Jul 6, 2020

cloudpickle 1.5

@jakirkham
Copy link
Member Author

Sure. I mean that will be a requirement to update distributed. So I would think yes.

@jakirkham
Copy link
Member Author

cc @jjhelmus (in case you have thoughts on updating cloudpickle in defaults)

@jjhelmus
Copy link
Contributor

jjhelmus commented Jul 6, 2020

I do not foresee any issue updating cloudpickle prior to the next release of distributed in defaults. We are currently holding off building new packages as we are preparing for a release of Anaconda Individual Edition but once that is out updates should proceed at a normal pace.

In the future @anaconda-pkg-build is a good handle to ping on these type of issue as it goes to multiple engineers at Anaconda who do packaging.

@jakirkham
Copy link
Member Author

In the future @anaconda-pkg-build is a good handle to ping on these type of issue as it goes to multiple engineers at Anaconda who do packaging.

Good tip! Thanks Jonathan. Will keep this in mind for the future 🙂

@jakirkham
Copy link
Member Author

Any other questions @mrocklin? 🙂

@mrocklin
Copy link
Member

mrocklin commented Jul 9, 2020

1.5 seems to be released. If we're highly confident that there won't be any issues with this then yes, let's go ahead. If you think that there is some small chance of issues then let's wait. I think that you're probably the most educated person to make this call :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

pickle5 support
4 participants