Skip to content

Commit

Permalink
Use unpacking default implementation for Layer
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian Rose committed Jan 26, 2021
1 parent 3305e07 commit 066c00e
Showing 1 changed file with 1 addition and 17 deletions.
18 changes: 1 addition & 17 deletions distributed/protocol/highlevelgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,6 @@ def highlevelgraph_pack(hlg: HighLevelGraph, client, client_keys):
return dumps_msgpack({"layers": layers})


def _materialized_layer_unpack(state, dsk, dependencies, annotations):
dsk.update(state["dsk"])
for k, v in state["dependencies"].items():
dependencies[k] = list(set(dependencies.get(k, ())) | set(v))

if state["annotations"]:
expanded = Layer.expand_annotations(state["annotations"], state["dsk"].keys())
new_annotations = {}
for k, v in expanded.items():
if isinstance(v, dict) and k in annotations:
new_annotations[k] = {**annotations[k], **v}
else:
new_annotations[k] = v
annotations.update(new_annotations)


def highlevelgraph_unpack(dumped_hlg, annotations: dict):
"""Unpack the high level graph for Scheduler -> Worker communication
Expand Down Expand Up @@ -178,7 +162,7 @@ def highlevelgraph_unpack(dumped_hlg, annotations: dict):
layer["state"]["annotations"] = {}
layer["state"]["annotations"].update(annotations)
if layer["__module__"] is None: # Default implementation
unpack_func = _materialized_layer_unpack
unpack_func = Layer.__dask_distributed_unpack__
else:
mod = import_allowed_module(layer["__module__"])
unpack_func = getattr(mod, layer["__name__"]).__dask_distributed_unpack__
Expand Down

0 comments on commit 066c00e

Please sign in to comment.