Skip to content

Commit

Permalink
Support dumping cluster state to URL (#5863)
Browse files Browse the repository at this point in the history
  • Loading branch information
gjoseph92 authored Mar 10, 2022
1 parent 936fba5 commit 30f0b60
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 110 deletions.
157 changes: 76 additions & 81 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
from tornado import gen
from tornado.ioloop import PeriodicCallback

from . import preloading
from . import cluster_dump, preloading
from . import versions as version_module # type: ignore
from .batched import BatchedSend
from .cfexecutor import ClientExecutor
Expand Down Expand Up @@ -3826,108 +3826,73 @@ def scheduler_info(self, **kwargs):
self.sync(self._update_scheduler_info)
return self._scheduler_identity

async def _dump_cluster_state(
self,
filename: str,
exclude: Collection[str],
format: Literal["msgpack", "yaml"],
) -> None:

scheduler_info = self.scheduler.dump_state(exclude=exclude)

workers_info = self.scheduler.broadcast(
msg={"op": "dump_state", "exclude": exclude},
on_error="return_pickle",
)
versions_info = self._get_versions()
scheduler_info, workers_info, versions_info = await asyncio.gather(
scheduler_info, workers_info, versions_info
)
# Unpickle RPC errors and convert them to string
workers_info = {
k: repr(loads(v)) if isinstance(v, bytes) else v
for k, v in workers_info.items()
}

state = {
"scheduler": scheduler_info,
"workers": workers_info,
"versions": versions_info,
}

def tuple_to_list(node):
if isinstance(node, (list, tuple)):
return [tuple_to_list(el) for el in node]
elif isinstance(node, dict):
return {k: tuple_to_list(v) for k, v in node.items()}
else:
return node

# lists are converted to tuples by the RPC
state = tuple_to_list(state)

filename = str(filename)
if format == "msgpack":
import gzip

import msgpack

suffix = ".msgpack.gz"
if not filename.endswith(suffix):
filename += suffix

with gzip.open(filename, "wb") as fdg:
msgpack.pack(state, fdg)
elif format == "yaml":
import yaml

suffix = ".yaml"
if not filename.endswith(suffix):
filename += suffix

with open(filename, "w") as fd:
yaml.dump(state, fd)
else:
raise ValueError(
f"Unsupported format {format}. Possible values are `msgpack` or `yaml`"
)

def dump_cluster_state(
self,
filename: str = "dask-cluster-dump",
write_from_scheduler: bool | None = None,
exclude: Collection[str] = ("run_spec",),
format: Literal["msgpack", "yaml"] = "msgpack",
**storage_options,
):
"""Extract a dump of the entire cluster state and persist to disk.
"""Extract a dump of the entire cluster state and persist to disk or a URL.
This is intended for debugging purposes only.
Warning: Memory usage on client side can be large.
Warning: Memory usage on the scheduler (and client, if writing the dump locally)
can be large. On a large or long-running cluster, this can take several minutes.
The scheduler may be unresponsive while the dump is processed.
Results will be stored in a dict::
{
"scheduler_info": {...},
"worker_info": {
worker_addr: {...}, # worker attributes
"scheduler": {...}, # scheduler state
"workers": {
worker_addr: {...}, # worker state
...
}
"versions": {
"scheduler": {...},
"workers": {
worker_addr: {...},
...
}
}
}
Parameters
----------
filename:
The output filename. The appropriate file suffix (`.msgpack.gz` or
`.yaml`) will be appended automatically.
The path or URL to write to. The appropriate file suffix (``.msgpack.gz`` or
``.yaml``) will be appended automatically.
Must be a path supported by :func:`fsspec.open` (like ``s3://my-bucket/cluster-dump``,
or ``cluster-dumps/dump``). See ``write_from_scheduler`` to control whether
the dump is written directly to ``filename`` from the scheduler, or sent
back to the client over the network, then written locally.
write_from_scheduler:
If None (default), infer based on whether ``filename`` looks like a URL
or a local path: True if the filename contains ``://`` (like
``s3://my-bucket/cluster-dump``), False otherwise (like ``local_dir/cluster-dump``).
If True, write cluster state directly to ``filename`` from the scheduler.
If ``filename`` is a local path, the dump will be written to that
path on the *scheduler's* filesystem, so be careful if the scheduler is running
on ephemeral hardware. Useful when the scheduler is attached to a network
filesystem or persistent disk, or for writing to buckets.
If False, transfer cluster state from the scheduler back to the client
over the network, then write it to ``filename``. This is much less
efficient for large dumps, but useful when the scheduler doesn't have
access to any persistent storage.
exclude:
A collection of attribute names which are supposed to be excluded
from the dump, e.g. to exclude code, tracebacks, logs, etc.
Defaults to exclude `run_spec` which is the serialized user code. This
is typically not required for debugging. To allow serialization of
this, pass an empty tuple.
Defaults to exclude ``run_spec``, which is the serialized user code.
This is typically not required for debugging. To allow serialization
of this, pass an empty tuple.
format:
Either msgpack or yaml. If msgpack is used (default), the output
will be stored in a gzipped file as msgpack.
Either ``"msgpack"`` or ``"yaml"``. If msgpack is used (default),
the output will be stored in a gzipped file as msgpack.
To read::
Expand All @@ -3944,15 +3909,45 @@ def dump_cluster_state(
from yaml import Loader
with open("filename") as fd:
state = yaml.load(fd, Loader=Loader)
**storage_options:
Any additional arguments to :func:`fsspec.open` when writing to a URL.
"""
return self.sync(
self._dump_cluster_state,
filename=filename,
format=format,
write_from_scheduler=write_from_scheduler,
exclude=exclude,
format=format,
**storage_options,
)

async def _dump_cluster_state(
self,
filename: str = "dask-cluster-dump",
write_from_scheduler: bool | None = None,
exclude: Collection[str] = ("run_spec",),
format: Literal["msgpack", "yaml"] = "msgpack",
**storage_options,
):
filename = str(filename)
if write_from_scheduler is None:
write_from_scheduler = "://" in filename

if write_from_scheduler:
await self.scheduler.dump_cluster_state_to_url(
url=filename,
exclude=exclude,
format=format,
**storage_options,
)
else:
await cluster_dump.write_state(
partial(self.scheduler.get_cluster_state, exclude=exclude),
filename,
format,
**storage_options,
)

def write_scheduler_file(self, scheduler_file):
"""Write the scheduler information to a json file.
Expand Down
59 changes: 59 additions & 0 deletions distributed/cluster_dump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"Utilities for generating and analyzing cluster dumps"

from __future__ import annotations

from typing import IO, Any, Awaitable, Callable, Literal

import fsspec
import msgpack

from distributed.compatibility import to_thread


def _tuple_to_list(node):
if isinstance(node, (list, tuple)):
return [_tuple_to_list(el) for el in node]
elif isinstance(node, dict):
return {k: _tuple_to_list(v) for k, v in node.items()}
else:
return node


async def write_state(
get_state: Callable[[], Awaitable[Any]],
url: str,
format: Literal["msgpack", "yaml"],
**storage_options: dict[str, Any],
) -> None:
"Await a cluster dump, then serialize and write it to a path"
if format == "msgpack":
mode = "wb"
suffix = ".msgpack.gz"
if not url.endswith(suffix):
url += suffix
writer = msgpack.pack
elif format == "yaml":
import yaml

mode = "w"
suffix = ".yaml"
if not url.endswith(suffix):
url += suffix

def writer(state: dict, f: IO):
# YAML adds unnecessary `!!python/tuple` tags; convert tuples to lists to avoid them.
# Unnecessary for msgpack, since tuples and lists are encoded the same.
yaml.dump(_tuple_to_list(state), f)

else:
raise ValueError(
f"Unsupported format {format!r}. Possible values are 'msgpack' or 'yaml'."
)

# Eagerly open the file to catch any errors before doing the full dump
# NOTE: `compression="infer"` will automatically use gzip via the `.gz` suffix
with fsspec.open(url, mode, compression="infer", **storage_options) as f:
state = await get_state()
# Write from a thread so we don't block the event loop quite as badly
# (the writer will still hold the GIL a lot though).
await to_thread(writer, state, f)
54 changes: 53 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from datetime import timedelta
from functools import partial
from numbers import Number
from typing import ClassVar, Literal
from typing import Any, ClassVar, Dict, Literal
from typing import cast as pep484_cast

import psutil
Expand All @@ -51,6 +51,7 @@
from dask.utils import format_bytes, format_time, parse_bytes, parse_timedelta, tmpfile
from dask.widgets import get_template

from distributed import cluster_dump
from distributed.utils import recursive_to_dict

from . import preloading, profile
Expand Down Expand Up @@ -3970,6 +3971,8 @@ def __init__(
"subscribe_worker_status": self.subscribe_worker_status,
"start_task_metadata": self.start_task_metadata,
"stop_task_metadata": self.stop_task_metadata,
"get_cluster_state": self.get_cluster_state,
"dump_cluster_state_to_url": self.dump_cluster_state_to_url,
}

connection_limit = get_fileno_limit() / 2
Expand Down Expand Up @@ -4082,6 +4085,55 @@ def _to_dict(
info.update(recursive_to_dict(extra, exclude=exclude))
return info

async def get_cluster_state(
self,
exclude: "Collection[str]",
) -> dict:
"Produce the state dict used in a cluster state dump"
# Kick off state-dumping on workers before we block the event loop in `self._to_dict`.
workers_future = asyncio.gather(
self.broadcast(
msg={"op": "dump_state", "exclude": exclude},
on_error="return",
),
self.broadcast(
msg={"op": "versions"},
on_error="ignore",
),
)
try:
scheduler_state = self._to_dict(exclude=exclude)

worker_states, worker_versions = await workers_future
finally:
# Ensure the tasks aren't left running if anything fails.
# Someday (py3.11), use a trio-style TaskGroup for this.
workers_future.cancel()

# Convert any RPC errors to strings
worker_states = {
k: repr(v) if isinstance(v, Exception) else v
for k, v in worker_states.items()
}

return {
"scheduler": scheduler_state,
"workers": worker_states,
"versions": {"scheduler": self.versions(), "workers": worker_versions},
}

async def dump_cluster_state_to_url(
self,
url: str,
exclude: "Collection[str]",
format: Literal["msgpack", "yaml"],
**storage_options: Dict[str, Any],
) -> None:
"Write a cluster state dump to an fsspec-compatible URL."
await cluster_dump.write_state(
partial(self.get_cluster_state, exclude), url, format, **storage_options
)

def get_worker_service_addr(self, worker, service_name, protocol=False):
"""
Get the (host, port) address of the named service on the *worker*.
Expand Down
Loading

0 comments on commit 30f0b60

Please sign in to comment.