Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test_dont_select_closed_worker flaky #6507

Closed
gjoseph92 opened this issue Jun 3, 2022 · 2 comments · Fixed by #6865
Closed

test_dont_select_closed_worker flaky #6507

gjoseph92 opened this issue Jun 3, 2022 · 2 comments · Fixed by #6865

Comments

@gjoseph92
Copy link
Collaborator

This looks somewhat similar to #6506 (they both have coroutine 'InProc.write' was never awaited).

cls = <class '_pytest.runner.CallInfo'>
func = <function call_runtest_hook.<locals>.<lambda> at 0x7fdcf3541ca0>
when = 'teardown'
reraise = (<class '_pytest.outcomes.Exit'>, <class 'KeyboardInterrupt'>)

    @classmethod
    def from_call(
        cls,
        func: "Callable[[], TResult]",
        when: "Literal['collect', 'setup', 'call', 'teardown']",
        reraise: Optional[
            Union[Type[BaseException], Tuple[Type[BaseException], ...]]
        ] = None,
    ) -> "CallInfo[TResult]":
        """Call func, wrapping the result in a CallInfo.
    
        :param func:
            The function to call. Called without arguments.
        :param when:
            The phase in which the function is called.
        :param reraise:
            Exception or exceptions that shall propagate if raised by the
            function, instead of being wrapped in the CallInfo.
        """
        excinfo = None
        start = timing.time()
        precise_start = timing.perf_counter()
        try:
>           result: Optional[TResult] = func()

/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/_pytest/runner.py:338: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/_pytest/runner.py:259: in <lambda>
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/pluggy/_hooks.py:265: in __call__
    return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/pluggy/_manager.py:80: in _hookexec
    return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/_pytest/unraisableexception.py:93: in pytest_runtest_teardown
    yield from unraisable_exception_runtest_hook()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def unraisable_exception_runtest_hook() -> Generator[None, None, None]:
        with catch_unraisable_exception() as cm:
            yield
            if cm.unraisable:
                if cm.unraisable.err_msg is not None:
                    err_msg = cm.unraisable.err_msg
                else:
                    err_msg = "Exception ignored in"
                msg = f"{err_msg}: {cm.unraisable.object!r}\n\n"
                msg += "".join(
                    traceback.format_exception(
                        cm.unraisable.exc_type,
                        cm.unraisable.exc_value,
                        cm.unraisable.exc_traceback,
                    )
                )
>               warnings.warn(pytest.PytestUnraisableExceptionWarning(msg))
E               pytest.PytestUnraisableExceptionWarning: Exception ignored in: <coroutine object InProc.write at 0x7fdcf39ac0c0>
E               
E               Traceback (most recent call last):
E                 File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/warnings.py", line 506, in _warn_unawaited_coroutine
E                   warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
E               RuntimeWarning: coroutine 'InProc.write' was never awaited

/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/_pytest/unraisableexception.py:78: PytestUnraisableExceptionWarning
@gjoseph92
Copy link
Collaborator Author

Hm, trying to run locally on main I get a different issue

==================================================================================== FAILURES =====================================================================================
_____________________________________________________________________ test_dont_select_closed_worker[40-1000] _____________________________________________________________________

self = LocalCluster(0af1b62c, '<Not Connected>', workers=0, threads=0, memory=0 B)

    async def _start(self):
        while self.status == Status.starting:
            await asyncio.sleep(0.01)
        if self.status == Status.running:
            return
        if self.status == Status.closed:
            raise ValueError("Cluster is closed")
    
        self._lock = asyncio.Lock()
        self.status = Status.starting
    
        if self.scheduler_spec is None:
            try:
                import distributed.dashboard  # noqa: F401
            except ImportError:
                pass
            else:
                options = {"dashboard": True}
            self.scheduler_spec = {"cls": Scheduler, "options": options}
    
        try:
            # Check if scheduler has already been created by a subclass
            if self.scheduler is None:
                cls = self.scheduler_spec["cls"]
                if isinstance(cls, str):
                    cls = import_term(cls)
>               self.scheduler = cls(**self.scheduler_spec.get("options", {}))

distributed/deploy/spec.py:293: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f8ae02f8630>, loop = None, delete_interval = '500ms'
synchronize_worker_interval = '60s', services = {}, service_kwargs = None, allowed_failures = 3, extensions = None, validate = False, scheduler_file = None
security = Security(require_encryption=False, tls_min_version=771), worker_ttl = '5 minutes', idle_timeout = None, interface = None, host = '127.0.0.1', port = 0
protocol = 'tcp://', dashboard_address = ':0', dashboard = True, http_prefix = '/', preload = [], preload_argv = [], plugins = (), contact_address = None
transition_counter_max = False, kwargs = {'blocked_handlers': None}
http_server_modules = ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']
show_dashboard = True, distributed = <module 'distributed' from '/Users/gabe/dev/distributed/distributed/__init__.py'>
routes = [('/metrics', <class 'distributed.http.scheduler.prometheus.core.PrometheusHandler'>, {'dask_server': <[AttributeError...teError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f8ae02f8630>}), ...]

    def __init__(
        self,
        loop=None,
        delete_interval="500ms",
        synchronize_worker_interval="60s",
        services=None,
        service_kwargs=None,
        allowed_failures=None,
        extensions=None,
        validate=None,
        scheduler_file=None,
        security=None,
        worker_ttl=None,
        idle_timeout=None,
        interface=None,
        host=None,
        port=0,
        protocol=None,
        dashboard_address=None,
        dashboard=None,
        http_prefix="/",
        preload=None,
        preload_argv=(),
        plugins=(),
        contact_address=None,
        transition_counter_max=False,
        **kwargs,
    ):
        if loop is not None:
            warnings.warn(
                "the loop kwarg to Scheduler is deprecated",
                DeprecationWarning,
                stacklevel=2,
            )
    
        self.loop = IOLoop.current()
        self._setup_logging(logger)
    
        # Attributes
        if contact_address is None:
            contact_address = dask.config.get("distributed.scheduler.contact-address")
        self.contact_address = contact_address
        if allowed_failures is None:
            allowed_failures = dask.config.get("distributed.scheduler.allowed-failures")
        self.allowed_failures = allowed_failures
        if validate is None:
            validate = dask.config.get("distributed.scheduler.validate")
        self.proc = psutil.Process()
        self.delete_interval = parse_timedelta(delete_interval, default="ms")
        self.synchronize_worker_interval = parse_timedelta(
            synchronize_worker_interval, default="ms"
        )
        self.digests = None
        self.service_specs = services or {}
        self.service_kwargs = service_kwargs or {}
        self.services = {}
        self.scheduler_file = scheduler_file
        worker_ttl = worker_ttl or dask.config.get("distributed.scheduler.worker-ttl")
        self.worker_ttl = parse_timedelta(worker_ttl) if worker_ttl else None
        idle_timeout = idle_timeout or dask.config.get(
            "distributed.scheduler.idle-timeout"
        )
        if idle_timeout:
            self.idle_timeout = parse_timedelta(idle_timeout)
        else:
            self.idle_timeout = None
        self.idle_since = time()
        self.time_started = self.idle_since  # compatibility for dask-gateway
        self._lock = asyncio.Lock()
        self.bandwidth_workers = defaultdict(float)
        self.bandwidth_types = defaultdict(float)
    
        if not preload:
            preload = dask.config.get("distributed.scheduler.preload")
        if not preload_argv:
            preload_argv = dask.config.get("distributed.scheduler.preload-argv")
        self.preloads = preloading.process_preloads(self, preload, preload_argv)
    
        if isinstance(security, dict):
            security = Security(**security)
        self.security = security or Security()
        assert isinstance(self.security, Security)
        self.connection_args = self.security.get_connection_args("scheduler")
        self.connection_args["handshake_overrides"] = {  # common denominator
            "pickle-protocol": 4
        }
    
        self._start_address = addresses_from_user_args(
            host=host,
            port=port,
            interface=interface,
            protocol=protocol,
            security=security,
            default_port=self.default_port,
        )
    
        http_server_modules = dask.config.get("distributed.scheduler.http.routes")
        show_dashboard = dashboard or (dashboard is None and dashboard_address)
        # install vanilla route if show_dashboard but bokeh is not installed
        if show_dashboard:
            try:
                import distributed.dashboard.scheduler
            except ImportError:
                show_dashboard = False
                http_server_modules.append("distributed.http.scheduler.missing_bokeh")
        routes = get_handlers(
            server=self, modules=http_server_modules, prefix=http_prefix
        )
>       self.start_http_server(routes, dashboard_address, default_port=8787)

distributed/scheduler.py:2965: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <[AttributeError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f8ae02f8630>
routes = [('/metrics', <class 'distributed.http.scheduler.prometheus.core.PrometheusHandler'>, {'dask_server': <[AttributeError...teError("'Scheduler' object has no attribute '_address'") raised in repr()] Scheduler object at 0x7f8ae02f8630>}), ...]
dashboard_address = ':0', default_port = 8787, ssl_options = None

    def start_http_server(
        self, routes, dashboard_address, default_port=0, ssl_options=None
    ):
        """This creates an HTTP Server running on this node"""
    
        self.http_application = RoutingApplication(routes)
    
        # TLS configuration
        tls_key = dask.config.get("distributed.scheduler.dashboard.tls.key")
        tls_cert = dask.config.get("distributed.scheduler.dashboard.tls.cert")
        tls_ca_file = dask.config.get("distributed.scheduler.dashboard.tls.ca-file")
        if tls_cert:
            import ssl
    
            ssl_options = ssl.create_default_context(
                cafile=tls_ca_file, purpose=ssl.Purpose.CLIENT_AUTH
            )
            ssl_options.load_cert_chain(tls_cert, keyfile=tls_key)
    
        self.http_server = HTTPServer(self.http_application, ssl_options=ssl_options)
    
        http_addresses = clean_dashboard_address(dashboard_address or default_port)
    
        for http_address in http_addresses:
            if http_address["address"] is None:
                address = self._start_address
                if isinstance(address, (list, tuple)):
                    address = address[0]
                if address:
                    with suppress(ValueError):
                        http_address["address"] = get_address_host(address)
    
            change_port = False
            retries_left = 3
            while True:
                try:
                    if not change_port:
                        self.http_server.listen(**http_address)
                    else:
>                       self.http_server.listen(**tlz.merge(http_address, {"port": 0}))

distributed/node.py:158: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <tornado.httpserver.HTTPServer object at 0x7f8b04694040>, port = 0, address = ''

    def listen(self, port: int, address: str = "") -> None:
        """Starts accepting connections on the given port.
    
        This method may be called more than once to listen on multiple ports.
        `listen` takes effect immediately; it is not necessary to call
        `TCPServer.start` afterwards.  It is, however, necessary to start
        the `.IOLoop`.
        """
>       sockets = bind_sockets(port, address=address)

../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/tcpserver.py:151: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

port = 0, address = None, family = <AddressFamily.AF_UNSPEC: 0>, backlog = 128, flags = <AddressInfo.AI_PASSIVE: 1>, reuse_port = False

    def bind_sockets(
        port: int,
        address: Optional[str] = None,
        family: socket.AddressFamily = socket.AF_UNSPEC,
        backlog: int = _DEFAULT_BACKLOG,
        flags: Optional[int] = None,
        reuse_port: bool = False,
    ) -> List[socket.socket]:
        """Creates listening sockets bound to the given port and address.
    
        Returns a list of socket objects (multiple sockets are returned if
        the given address maps to multiple IP addresses, which is most common
        for mixed IPv4 and IPv6 use).
    
        Address may be either an IP address or hostname.  If it's a hostname,
        the server will listen on all IP addresses associated with the
        name.  Address may be an empty string or None to listen on all
        available interfaces.  Family may be set to either `socket.AF_INET`
        or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise
        both will be used if available.
    
        The ``backlog`` argument has the same meaning as for
        `socket.listen() <socket.socket.listen>`.
    
        ``flags`` is a bitmask of AI_* flags to `~socket.getaddrinfo`, like
        ``socket.AI_PASSIVE | socket.AI_NUMERICHOST``.
    
        ``reuse_port`` option sets ``SO_REUSEPORT`` option for every socket
        in the list. If your platform doesn't support this option ValueError will
        be raised.
        """
        if reuse_port and not hasattr(socket, "SO_REUSEPORT"):
            raise ValueError("the platform doesn't support SO_REUSEPORT")
    
        sockets = []
        if address == "":
            address = None
        if not socket.has_ipv6 and family == socket.AF_UNSPEC:
            # Python can be compiled with --disable-ipv6, which causes
            # operations on AF_INET6 sockets to fail, but does not
            # automatically exclude those results from getaddrinfo
            # results.
            # http://bugs.python.org/issue16208
            family = socket.AF_INET
        if flags is None:
            flags = socket.AI_PASSIVE
        bound_port = None
        unique_addresses = set()  # type: set
        for res in sorted(
            socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, 0, flags),
            key=lambda x: x[0],
        ):
            if res in unique_addresses:
                continue
    
            unique_addresses.add(res)
    
            af, socktype, proto, canonname, sockaddr = res
            if (
                sys.platform == "darwin"
                and address == "localhost"
                and af == socket.AF_INET6
                and sockaddr[3] != 0
            ):
                # Mac OS X includes a link-local address fe80::1%lo0 in the
                # getaddrinfo results for 'localhost'.  However, the firewall
                # doesn't understand that this is a local address and will
                # prompt for access (often repeatedly, due to an apparent
                # bug in its ability to remember granting access to an
                # application). Skip these addresses.
                continue
            try:
                sock = socket.socket(af, socktype, proto)
            except socket.error as e:
                if errno_from_exception(e) == errno.EAFNOSUPPORT:
                    continue
                raise
            if os.name != "nt":
                try:
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                except socket.error as e:
                    if errno_from_exception(e) != errno.ENOPROTOOPT:
                        # Hurd doesn't support SO_REUSEADDR.
                        raise
            if reuse_port:
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
            if af == socket.AF_INET6:
                # On linux, ipv6 sockets accept ipv4 too by default,
                # but this makes it impossible to bind to both
                # 0.0.0.0 in ipv4 and :: in ipv6.  On other systems,
                # separate sockets *must* be used to listen for both ipv4
                # and ipv6.  For consistency, always disable ipv4 on our
                # ipv6 sockets and use a separate ipv4 socket when needed.
                #
                # Python 2.x on windows doesn't have IPPROTO_IPV6.
                if hasattr(socket, "IPPROTO_IPV6"):
                    sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
    
            # automatic port allocation with port=None
            # should bind on the same port on IPv4 and IPv6
            host, requested_port = sockaddr[:2]
            if requested_port == 0 and bound_port is not None:
                sockaddr = tuple([host, bound_port] + list(sockaddr[2:]))
    
            sock.setblocking(False)
            try:
>               sock.bind(sockaddr)
E               OSError: [Errno 48] Address already in use

../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/netutil.py:161: OSError

During handling of the above exception, another exception occurred:

    def test_dont_select_closed_worker():
        # Make sure distributed does not try to reuse a client from a
        # closed cluster (https://github.com/dask/distributed/issues/2840).
        with clean(threads=False):
>           cluster = LocalCluster(n_workers=0, dashboard_address=":0")

distributed/deploy/tests/test_local.py:914: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/deploy/local.py:236: in __init__
    super().__init__(
distributed/deploy/spec.py:260: in __init__
    self.sync(self._start)
distributed/utils.py:320: in sync
    return sync(
distributed/utils.py:387: in sync
    raise exc.with_traceback(tb)
distributed/utils.py:360: in f
    result = yield future
../../miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/gen.py:762: in run
    value = future.result()
distributed/deploy/spec.py:303: in _start
    await self._close()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = LocalCluster(0af1b62c, '<Not Connected>', workers=0, threads=0, memory=0 B)

    async def _close(self):
        while self.status == Status.closing:
            await asyncio.sleep(0.1)
        if self.status == Status.closed:
            return
        if self.status == Status.running or self.status == Status.failed:
            self.status = Status.closing
    
            # Need to call stop here before we close all servers to avoid having
            # dangling tasks in the ioloop
            with suppress(AttributeError):
                self._adaptive.stop()
    
            f = self.scale(0)
            if isawaitable(f):
                await f
            await self._correct_state()
            await asyncio.gather(*self._futures)
    
            if self.scheduler_comm:
                async with self._lock:
                    with suppress(OSError):
                        await self.scheduler_comm.terminate()
                    await self.scheduler_comm.close_rpc()
            else:
                logger.warning("Cluster closed without starting up")
    
>           await self.scheduler.close()
E           AttributeError: 'NoneType' object has no attribute 'close'

distributed/deploy/spec.py:419: AttributeError

@fjetter
Copy link
Member

fjetter commented Jun 8, 2022

       await self.scheduler.close()

E AttributeError: 'NoneType' object has no attribute 'close'

Often indicates an error during startup. Indeed it look slike there is a port conflict during starting of the http server.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants