Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broken distributed test #1903

Closed
fmaussion opened this issue Feb 12, 2018 · 2 comments · Fixed by #1904
Closed

Broken distributed test #1903

fmaussion opened this issue Feb 12, 2018 · 2 comments · Fixed by #1904

Comments

@fmaussion
Copy link
Member

The recent distributed update (1.20.2) broke a test: https://github.com/pydata/xarray/blob/master/xarray/tests/test_distributed.py#L57-L84

It fails with:

>       assert s.task_state
E       AttributeError: 'Scheduler' object has no attribute 'task_state'
``` __________________________________ test_async __________________________________
def test_func():
    # Restore default logging levels
    # XXX use pytest hooks/fixtures instead?
    for name, level in logging_levels.items():
        logging.getLogger(name).setLevel(level)

    old_globals = _globals.copy()
    result = None
    workers = []

    with pristine_loop() as loop:
        with check_active_rpc(loop, active_rpc_timeout):
            @gen.coroutine
            def coro():
                for i in range(5):
                    try:
                        s, ws = yield start_cluster(
                            ncores, scheduler, loop, security=security,
                            Worker=Worker, scheduler_kwargs=scheduler_kwargs,
                            worker_kwargs=worker_kwargs)
                    except Exception:
                        logger.error("Failed to start gen_cluster, retryng")
                    else:
                        break
                workers[:] = ws
                args = [s] + workers
                if client:
                    c = yield Client(s.address, loop=loop, security=security,
                                     asynchronous=True)
                    args = [c] + args
                try:
                    result = yield func(*args)
                    if s.validate:
                        s.validate_state()
                finally:
                    if client:
                        yield c._close()
                    yield end_cluster(s, workers)
                    _globals.clear()
                    _globals.update(old_globals)

                raise gen.Return(result)
          result = loop.run_sync(coro, timeout=timeout)

../../../../.pyvirtualenvs/py3/lib/python3.5/site-packages/distributed/utils_test.py:749:


../../../../.pyvirtualenvs/py3/lib/python3.5/site-packages/tornado/ioloop.py:458: in run_sync
return future_cell[0].result()
../../../../.pyvirtualenvs/py3/lib/python3.5/site-packages/tornado/concurrent.py:238: in result
raise_exc_info(self._exc_info)
:4: in raise_exc_info
???
../../../../.pyvirtualenvs/py3/lib/python3.5/site-packages/tornado/gen.py:1069: in run
yielded = self.gen.send(value)
../../../../.pyvirtualenvs/py3/lib/python3.5/site-packages/distributed/utils_test.py:737: in coro
result = yield func(*args)
../../../../.pyvirtualenvs/py3/lib/python3.5/site-packages/tornado/gen.py:1055: in run
value = future.result()
../../../../.pyvirtualenvs/py3/lib/python3.5/site-packages/tornado/concurrent.py:238: in result
raise_exc_info(self._exc_info)
:4: in raise_exc_info
???
../../../../.pyvirtualenvs/py3/lib/python3.5/site-packages/tornado/gen.py:1069: in run
yielded = self.gen.send(value)


c = <Client: not connected>
s = <Scheduler: "tcp://127.0.0.1:38907" processes: 0 cores: 0>
a = <Worker: tcp://127.0.0.1:46497, closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: tcp://127.0.0.1:33287, closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

@pytest.mark.skipif(distributed.__version__ <= '1.19.3',
                    reason='Need recent distributed version to clean up get')
@gen_cluster(client=True, timeout=None)
def test_async(c, s, a, b):
    x = create_test_data()
    assert not dask.is_dask_collection(x)
    y = x.chunk({'dim2': 4}) + 10
    assert dask.is_dask_collection(y)
    assert dask.is_dask_collection(y.var1)
    assert dask.is_dask_collection(y.var2)

    z = y.persist()
    assert str(z)

    assert dask.is_dask_collection(z)
    assert dask.is_dask_collection(z.var1)
    assert dask.is_dask_collection(z.var2)
    assert len(y.__dask_graph__()) > len(z.__dask_graph__())

    assert not futures_of(y)
    assert futures_of(z)

    future = c.compute(z)
    w = yield future
    assert not dask.is_dask_collection(w)
    assert_allclose(x + 10, w)
  assert s.task_state

E AttributeError: 'Scheduler' object has no attribute 'task_state'

</details>
@jhamman
Copy link
Member

jhamman commented Feb 12, 2018

@mrocklin - was there a namespace change with the task_state property?

mrocklin added a commit to mrocklin/xarray that referenced this issue Feb 12, 2018
This internal state was changed in the latest release

Fixes pydata#1903
@mrocklin
Copy link
Contributor

mrocklin commented Feb 12, 2018 via email

fmaussion pushed a commit that referenced this issue Feb 12, 2018
This internal state was changed in the latest release

Fixes #1903
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants