-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unify annotations #4406
Unify annotations #4406
Conversation
516ade9
to
1a8e4a0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your work here @ian-r-rose. It looks like there are some merge conflicts which are preventing CI from running. When you get a chance, would you mind merging /rebasing master
?
d0c3740
to
469450f
Compare
We've had some out-of-band discussion about the ability to annotate specific collections with things like c.compute(x, retries={x: 2}) This worked for some collections, but not others. In 6e0695b I remove that ability, so that should be a good overview of what some of the consequences are. This is a breaking change, but perhaps of some functionality that was not widely used. These per-collection annotations can be accomplished with the new syntax using something like with dask.annotate(retries=2):
x = da.ones((1,1)
c.compute(x) This largely works, with the caveat that dask/dask#7036 should be fixed to get around some surprising results. |
ab6c85e
to
e63cbbe
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops. It looks like I might have had some lingering comments in review. Submitting now, although they may be out of date.
assert s.host_restrictions[L[2].key] == {b.ip} | ||
|
||
with pytest.raises(ValueError): | ||
c.map(inc, [10, 11, 12], workers=[{a.ip}]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious why these were removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was generally trying to avoid any expansion of keys/collections on the client, instead preferring to only allow a worker-or-iterable-of-workers. With that API restriction, the unpacking/expanding logic on the scheduler is much simpler. So I view this as philosophically aligned with removing the ability to expand { collection: worker }
on the client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking a look @mrocklin, I've responded to your comments inline.
I would also flag: there are a lot of tests which now have optimize_graph=False
. This is due to
dask/dask#7036, and should give an indication of what kinds of situations that affects
assert s.host_restrictions[L[2].key] == {b.ip} | ||
|
||
with pytest.raises(ValueError): | ||
c.map(inc, [10, 11, 12], workers=[{a.ip}]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was generally trying to avoid any expansion of keys/collections on the client, instead preferring to only allow a worker-or-iterable-of-workers. With that API restriction, the unpacking/expanding logic on the scheduler is much simpler. So I view this as philosophically aligned with removing the ability to expand { collection: worker }
on the client.
Checking in. How are we doing here? |
I think this is ready for review from my perspective. Tests are passing (except some that seem flaky and unrelated?), the major question in my mind is whether the backwards-incompatibilties introduced are worth it, and what to do about dask/dask#7036 . |
@sjperkins are you around to review? |
Sure will take a look over the next day or two. Thanks for working on this @ian-r-rose |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added some comments: I'm most interested in the one concerning annotation expansion.
I would also flag: there are a lot of tests which now have optimize_graph=False. This is due to
dask/dask#7036, and should give an indication of what kinds of situations that affects.
I might suggest commenting each optimize_graph=False case to indicate that it should be removed when dask/dask#7036 is fixed. Its a bit laborious and ugly but I can't think of a better way to handle this at present.
new_annotations[k] = merge(annotations[k], v) | ||
else: | ||
new_annotations[k] = v | ||
annotations.update(new_annotations) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this change provides backwards compatibility for the existing per-key Client.{submit, persist} kwarg taxonomy (retries, priority, workers)?
The functionality in _materialized_layer_unpack mirrors Layer.__dask_distributed_unpack__ and it all it's subclasses. They also need to expand and unpack annotations.
I wonder if it would be better to place this functionality in Layer.expand_annotations so that it automatically is applied through the Layer class hierarchy? It would require a sister dask PR though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out. This is actually not for backwards compatibility of per-key/collection annotations, but instead is needed to handle the case where different HLG Layers have different retries/priority/workers.
For example, if we are unpacking a two-layer HLG, and Layer A has retries:2
while Layer B has retries:4
, we need to make sure that the unpacking keeps both of those annotations. The current implementation has a shallow merge, which makes later-unpacked layers overwrite the annotations of previously-unpacked layers. To make the above case work, we need a bit deeper of a merge.
For the same reason, I also think this logic needs to be one step above Layer.expand_annotations
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right. Thanks for correcting my understanding.
new_annotations[k] = {**annotations[k], **v}
may be faster than tlz.merge now that dask supports >= Python 3.6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc'ing @madsbk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The functionality in _materialized_layer_unpack mirrors
Layer.__dask_distributed_unpack__
and it all it's subclasses. They also need to expand and unpack annotations.
I think this is still an issue, we need to apply the same changes to Blockwise.__dask_distributed_unpack__
and SimpleShuffleLayer.__dask_distributed_unpack__
.
Maybe it makes more sense to redesign the annotations by removing the annotations
argument from Layer.__dask_distributed_unpack__
and then have highlevelgraph_unpack()
expand and unpack annotations just after unpack_func(layer["state"], dsk, deps)
.
This will simplify all the __dask_distributed_unpack__
implementations and remove duplicate code. The only downside is that it limits Layers to do a specialized annotation expansion, which no one is doing currently. Unless I am missing something, I think that is a great trade-off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed discussion @sjperkins and @madsbk. I've tried to do something like a compromise solution in dask/dask#7102, and I'd love to hear your thoughts on it. It takes a somewhat minimal approach to API changes (so __dask_distributed_unpack__()
still takes the annotations), and avoids further imports of distributed
there. Overall the strategy I'm trying out is
- Move the default implementation of layer unpacking into
Layer
. - Move the logic of doing a deeper merge on annotations into a new
Layer.merge_annotations
, similar toLayer.expand_annotations
. I am not overly attached to this division of responsibility, so if there are suggestions for how to make it cleaner, I'd be happy to hear them. - Ensure that
Layer
,Blockwise
, andShuffleLayer
all unpack annotations properly (previously the latter two failed).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM @ian-r-rose but I think @sjperkins has a point, we should unify everything in Dask and change the API such that layers can delegate the annotation unpacking to Layer.__dask_distributed_unpack__()
.
I suggest that we split this work into multiple PRs. Let's rollback this PR to the state that didn't require any significant change to the API and get it merged. It greatly simplifies the code and is a win in all cases.
A followup PR can then modify the API based on the design thoughts we have been discussing here. I will be happy to initiate the followup PR, I think I understand @sjperkins's design ideas and concerns now :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the discussion @madsbk and @ian-r-rose . I agree with moving Layer API changes forward to a new PR, so don't let my concerns hold this one up. LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@madsbk I'd be happy to have this be across multiple PRs. Can you specify what API changes you would want to roll back? This now depends on dask/dask#7102, but that is somewhat more of a bugfix than a major API change.
I could bring the default implementation of Layer.__dask_distributed_unpack__()
back here and then xfail the test that catches dask/dask#7102 , if that's what you mean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry my bad, I thought you changed the API in the last commit. I am totally fine with this PR+dask/dask#7102
if layer["__module__"] is None: # Default implementation | ||
unpack_func = _materialized_layer_unpack | ||
else: | ||
mod = import_allowed_module(layer["__module__"]) | ||
unpack_func = getattr(mod, layer["__name__"]).__dask_distributed_unpack__ | ||
unpack_func(layer["state"], dsk, deps, annotations) | ||
unpack_func(layer["state"], dsk, deps, out_annotations) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me this implies that Layer annotations always take precedence over the existing Client.{submit, persist} kwarg taxonomy. I think this makes sense in terms of backwards compatibility (specific annotations override global annotations)
@mrocklin You mentioned offline that passing in For simplicity's sake, this PR has tried to completely move such annotations to live at the HLG Layer level, and remove the older ways of doing it, including I've tried to avoid a proliferation such special cases in the pack/unpack logic. We could revisit that, but at least at the moment, retries/priority/workers/etc only take very simple non-graph objects. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this look great @ian-r-rose but I think we should consider simplifying the annotation design in order avoid duplicate annotation implementations in each Layer (see comment).
new_annotations[k] = merge(annotations[k], v) | ||
else: | ||
new_annotations[k] = v | ||
annotations.update(new_annotations) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The functionality in _materialized_layer_unpack mirrors
Layer.__dask_distributed_unpack__
and it all it's subclasses. They also need to expand and unpack annotations.
I think this is still an issue, we need to apply the same changes to Blockwise.__dask_distributed_unpack__
and SimpleShuffleLayer.__dask_distributed_unpack__
.
Maybe it makes more sense to redesign the annotations by removing the annotations
argument from Layer.__dask_distributed_unpack__
and then have highlevelgraph_unpack()
expand and unpack annotations just after unpack_func(layer["state"], dsk, deps)
.
This will simplify all the __dask_distributed_unpack__
implementations and remove duplicate code. The only downside is that it limits Layers to do a specialized annotation expansion, which no one is doing currently. Unless I am missing something, I think that is a great trade-off.
Previously we had two systems to send per-task metadata like retries or workers or priorities to the scheduler. 1. Older system with explicit workers= keywords and expand_foo functions 2. Newer system with annotations The annotations system is nicer for a few reasons: 1. It's more generic 2. It's more consistent (there were some bugs in the expand foo functions, especially when dealing with collections) 3. We ship values up on a per-layer basis rather than a per-key basis This work-in-progress commit rips out the old system and uses the new system, but it still missing a lot: 1. It only implements this for the Client.compute method. We need to repeat this for persist, submit, and map 2. It doesn't handle the allow_other_workers -> loose_restrictions conversion anywhere yet. (this will need to be added to the scheduler) rebase fail
side and pass them unmodified to the scheduler.
version of doing things.
compute/persist. These can still be accomplished in an annotation context.
999b04b
to
066c00e
Compare
n.b., now that this depends on dask/dask#7102, tests are failing, but they should pass once that PR is resolved |
066c00e
to
b98a1f9
Compare
How are we doing here? |
dask/dask#7102 (on which this now depends) has been merged, I think this is ready (test failures look unrelated to me) |
My sense is this is good for a final review and merge |
Note, I've been ignoring travis failures for a while now, for better or worse. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all your work on this @ian-r-rose! I've left a few comments below, but overall the changes here look great. Thanks for adding a bunch of tests.
We might also consider updating https://distributed.dask.org/en/latest/resources.html to reflect to new annotations approach for collections
distributed/tests/test_client.py
Outdated
@pytest.mark.skipif( | ||
not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost" | ||
) | ||
@gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True) | ||
async def test_restrictions_get_annotate(c, s, a, b): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the workers=
annotation also supports worker addresses in addition to worker hostnames, I think we can update this to remove the pytest.mark.skipif
while testing the same behavior:
diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py
index eb1c362d..0a271596 100644
--- a/distributed/tests/test_client.py
+++ b/distributed/tests/test_client.py
@@ -957,15 +957,12 @@ async def test_restrictions_get(c, s, a, b):
assert len(b.data) == 0
-@pytest.mark.skipif(
- not sys.platform.startswith("linux"), reason="Need 127.0.0.2 to mean localhost"
-)
-@gen_cluster([("127.0.0.1", 1), ("127.0.0.2", 2)], client=True)
+@gen_cluster(client=True)
async def test_restrictions_get_annotate(c, s, a, b):
x = 1
- with dask.annotate(workers=a.ip):
+ with dask.annotate(workers=a.address):
y = delayed(inc)(x)
- with dask.annotate(workers=b.ip):
+ with dask.annotate(workers=b.address):
z = delayed(inc)(y)
futures = c.get(z.__dask_graph__(), [y.key, z.key], sync=False)
This is particularly nice given our current linux testing situation on Travis
|
||
assert s.loose_restrictions == {total2.key} | {v.key for v in L2} | ||
|
||
|
||
@nodebug # test timing is fragile | ||
@gen_cluster(nthreads=[("127.0.0.1", 1)] * 3, client=True) | ||
async def test_persist_workers(e, s, a, b, c): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my own understanding, it looks like test_persist_workers
and test_compute_workers
have been updated to specifically not run tasks on a worker, which is earlier to specify with the workers=
keyword, instead of running certain collections on certain workers. We're now covering the certain collections on certain workers functionality with new test_persist_workers_annotate
and test_compute_workers_annotate
tests. Is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated them to persist/compute on a given set of workers, but it is no longer possible to delegate certain tasks/keys to a specific worker in a persist/compute call. In order to do that, you now need to use the annotation framework.
We're now covering the certain collections on certain workers functionality with new
test_persist_workers_annotate
andtest_compute_workers_annotate
tests. Is that correct?
Correct
Just wanted to point out that the CI is starting to fail here (xref https://github.com/dask/distributed/pull/4467/checks?check_run_id=1786579413) because of the updates in dask/dask#7102. Merging this PR will resolve the AttributeError: type object 'Layer' has no attribute 'expand_annotations' issue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ian-r-rose for working on this and @sjperkins @madsbk @mrocklin for reviewing!
Thanks for the reviews, all! |
Thanks @ian-r-rose and congratulations on treading a path through all the discussion :-D |
Continues/supersedes #4347, using the task annotation machinery to specify workers/priority/retries/etc when submitting code to the scheduler. Fixes #4262