Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Formalization of Computation #4923

Closed
wants to merge 11 commits into from
Closed
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.7.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ dependencies:
- zict
- zstandard
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/madsbk/dask.git@formalization_of_computation
- git+https://github.com/jcrist/crick # Only tested here
- keras
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.8.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ dependencies:
- zict
- zstandard
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/madsbk/dask.git@formalization_of_computation
- keras
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.9.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ dependencies:
- zict # overridden by git tip below
- zstandard
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/madsbk/dask.git@formalization_of_computation
- git+https://github.com/dask/s3fs
- git+https://github.com/dask/zict
- git+https://github.com/intake/filesystem_spec
Expand Down
5 changes: 3 additions & 2 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@
from .metrics import time
from .objects import HasWhat, SchedulerInfo, WhoHas
from .protocol import to_serialize
from .protocol.pickle import dumps, loads
from .protocol.computation import PickledObject
from .protocol.pickle import dumps
from .publish import Datasets
from .pubsub import PubSubClientExtension
from .security import Security
Expand Down Expand Up @@ -1325,7 +1326,7 @@ def _handle_key_in_memory(self, key=None, type=None, workers=None):
if state is not None:
if type and not state.type: # Type exists and not yet set
try:
type = loads(type)
type = PickledObject.deserialize(type)
except Exception:
type = None
# Here, `type` may be a str if actual type failed
Expand Down
8 changes: 4 additions & 4 deletions distributed/diagnostics/eventstream.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging

from ..core import coerce_to_address, connect
from ..worker import dumps_function
from ..protocol.computation import PickledCallable
from .plugin import SchedulerPlugin

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -62,10 +62,10 @@ async def eventstream(address, interval):
await comm.write(
{
"op": "feed",
"setup": dumps_function(EventStream),
"function": dumps_function(swap_buffer),
"setup": PickledCallable.serialize(EventStream),
"function": PickledCallable.serialize(swap_buffer),
"interval": interval,
"teardown": dumps_function(teardown),
"teardown": PickledCallable.serialize(teardown),
}
)
return comm
8 changes: 4 additions & 4 deletions distributed/diagnostics/progress_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from tlz import merge, valmap

from ..core import coerce_to_address, connect
from ..protocol.computation import PickledCallable
from ..scheduler import Scheduler
from ..utils import color_of, key_split
from ..worker import dumps_function
from .progress import AllProgress

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -50,10 +50,10 @@ async def progress_stream(address, interval):
await comm.write(
{
"op": "feed",
"setup": dumps_function(AllProgress),
"function": dumps_function(counts),
"setup": PickledCallable.serialize(AllProgress),
"function": PickledCallable.serialize(counts),
"interval": interval,
"teardown": dumps_function(remove_plugin),
"teardown": PickledCallable.serialize(remove_plugin),
}
)
return comm
Expand Down
16 changes: 7 additions & 9 deletions distributed/diagnostics/progressbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@
from tlz import valmap
from tornado.ioloop import IOLoop

import dask

from ..client import default_client, futures_of
from ..core import CommClosedError, coerce_to_address, connect
from ..protocol.pickle import dumps
from ..utils import LoopRunner, is_kernel, key_split
from ..protocol.computation import PickledCallable
from ..utils import LoopRunner, is_kernel, key_split, parse_timedelta
from .progress import MultiProgress, Progress, format_time

logger = logging.getLogger(__name__)
Expand All @@ -36,7 +34,7 @@ def __init__(self, keys, scheduler=None, interval="100ms", complete=True):
break

self.keys = {k.key if hasattr(k, "key") else k for k in keys}
self.interval = dask.utils.parse_timedelta(interval, default="s")
self.interval = parse_timedelta(interval, default="s")
self.complete = complete
self._start_time = default_timer()

Expand Down Expand Up @@ -71,8 +69,8 @@ def function(scheduler, p):
await self.comm.write(
{
"op": "feed",
"setup": dumps(setup),
"function": dumps(function),
"setup": PickledCallable.serialize(setup),
"function": PickledCallable.serialize(function),
"interval": self.interval,
},
serializers=self.client()._serializers if self.client else None,
Expand Down Expand Up @@ -263,8 +261,8 @@ def function(scheduler, p):
await self.comm.write(
{
"op": "feed",
"setup": dumps(setup),
"function": dumps(function),
"setup": PickledCallable.serialize(setup),
"function": PickledCallable.serialize(function),
"interval": self.interval,
}
)
Expand Down
10 changes: 3 additions & 7 deletions distributed/diagnostics/tests/test_widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,14 @@ def record_display(*args):
import re
from operator import add

from tlz import valmap

from distributed.client import wait
from distributed.diagnostics.progressbar import (
MultiProgressWidget,
ProgressWidget,
progress,
)
from distributed.protocol.computation import typeset_dask_graph
from distributed.utils_test import dec, gen_cluster, gen_tls_cluster, inc, throws
from distributed.worker import dumps_task


@gen_cluster(client=True)
Expand Down Expand Up @@ -146,8 +144,7 @@ async def test_multi_progressbar_widget(c, s, a, b):
@gen_cluster()
async def test_multi_progressbar_widget_after_close(s, a, b):
s.update_graph(
tasks=valmap(
dumps_task,
tasks=typeset_dask_graph(
{
"x-1": (inc, 1),
"x-2": (inc, "x-1"),
Expand Down Expand Up @@ -232,8 +229,7 @@ def test_progressbar_cancel(client):
@gen_cluster()
async def test_multibar_complete(s, a, b):
s.update_graph(
tasks=valmap(
dumps_task,
madsbk marked this conversation as resolved.
Show resolved Hide resolved
tasks=typeset_dask_graph(
{
"x-1": (inc, 1),
"x-2": (inc, "x-1"),
Expand Down
Loading