Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass .chunk/rechunk calls through for chunked arrays without ChunkManagers #9286

Open
wants to merge 34 commits into
base: main
Choose a base branch
from

Conversation

TomNicholas
Copy link
Member

Basically implements @dcherian 's suggestion from #8733 (comment):

IIUC this mostly gets resolved if the ChunkManager is less greedy and doesn't trigger on the existence of .chunks but is instead triggered on matching an allowlist of registered chunk array types.

Needed to fix zarr-developers/VirtualiZarr#199 (comment).

The actual fix is in just the first two commits, the rest is defining a new has_chunkmanager function and using that everywhere to distinguish between arrays that have .chunks (e.g. virtualizarr.ManifestArray) and arrays that actually need to call out to a ChunkManager (i.e. dask/cubed).

@TomNicholas TomNicholas added the topic-chunked-arrays Managing different chunked backends, e.g. dask label Jul 26, 2024
@TomNicholas TomNicholas marked this pull request as ready for review July 26, 2024 21:58
Comment on lines 823 to 828
if is_chunked_array(data_old):
print(f"problematic chunks = {chunks}")
# if is_dict_like(chunks) and chunks != {}:
# chunks = tuple(chunks.get(n, s) for n, s in enumerate(data_old.shape)) # type: ignore[assignment]

print(f"hopefully normalized chunks = {chunks}")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really irritating - if I keep these lines commented out then my test_rechunk on the DummyChunkedArray fails. But if I uncomment these lines (therefore doing exactly what happens in the other branch of the if is_chunked_array(data_old): statement) then dask rechunk tests fail!

There are so many possible valid argument types for chunks here, some of which are dicts but completely different, e.g. {0: (2, 3)} vs {'x': (2, 3)}.

It would be much nicer for all possible chunks to go through a single normalize_chunks function, but I'm getting confused even trying to work out what the current behaviour is.

The ChunkManager has a .normalize_chunks method, to call out to dask.array.normalize_chunks. Cubed vendors this function too, so perhaps instead xarray should vendor dask.array.normalize_chunks and remove it from the ChunkManager class?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_rechunk on the DummyChunkedArray fails

This was actually mostly my fault for having a bug in that test, fixed by 0296f92.

It would be much nicer for all possible chunks to go through a single normalize_chunks function

But there is still some unnecessary complexity that would be nice to remove. The main reason why the weird is_dict_like(chunks): sections that turn dicts of chunks into tuples are currently needed is because of this bug in dask.array.core.normalize_chunks dask/dask#11261. Otherwise we could just use that.

(If we do just use that we should perhaps vendor it though - as cubed does already).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I managed to sort this all out, so now everything goes through dask.array.core.normalize_chunks, which is much neater.

Question is now do I:

  1. Vendor dask.array.core.normalize_chunks (like cubed does), and use the vendored version no matter which ChunkManager is called
  2. Make all chunkmanagers define a normalize_chunks method and refer to that (what the main code currently does).

I think we actually have to do (1), because we now have a codepath which will try to call normalize_chunks even on chunked arrays that do not define a chunkmanager. But we want to vendor it without introducing any more dependencies (e.g. toolz).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dcherian I would appreciate your input on this vendoring question before I move ahead with it ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vendor it! Sorry for the delay. We can generalize if it's ever needed

Comment on lines 183 to 186
def test_computation(self) -> None:
dummy_arr = DummyChunkedArray(shape=(4,), chunks=((1,),))
na: NamedArray = NamedArray(data=dummy_arr, dims=["x"])
na.mean()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure what the intended behaviour should be here. This test tests what happens if you try to compute an array that has .chunks but is not registered via any chunkmanager.

In virtualizarr's case this situation should just raise immediately because ManifestArrays are not computable, so from virtualizarr's PoV it doesn't really matter what happens here.

@hmaarrfk what is the preferred behaviour for your chunked arrays?

I guess if it does attempt to pass computation through here that could cause issues when computing on a cubed array with cubed-xarray not installed... (That scenario can't happen for dask because the equivalent DaskManager (i.e. dask-xarray) is effectively bundled inside xarray.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The users of my array (the rest of our team) feels like all these should do something.

It might make things REALLY slow, but I feel like mean should compute.... Your chunked array should know how best to compute it for itself. How should it compute intermediate results? In what order should it go through the array.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make things REALLY slow, but I feel like mean should compute....

Yes I agree.

How should it compute intermediate results? In what order should it go through the array.

I'm not sure I understand you here.

I guess if it does attempt to pass computation through here that could cause issues when computing on a cubed array with cubed-xarray not installed...

Thinking about this more I don't think it's a big deal. Not recognising a chunked array type will just mean that xarray falls back to calling numpy functions on it (e.g. da.mean() will call np.mean(arr)), which will call __array__ on the underlying type, coercing it to numpy, and in the case of cubed arrays, simply eagerly computing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cubed arrays, simply eagerly computing it.

maybe cubed would help us in our lazy arrays.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Me personally that is

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more I don't think it's a big deal.

You also kinda have to go out of your way to even created an xarray-wrapped cubed array without cubed-xarray installed, because you can only use open_dataset and .chunk to get cubed arrays if you have cubed-xarray installed.

maybe cubed would help us in our lazy arrays.

Maybe! The cubed.Plan model is super nice.

@@ -19,6 +20,7 @@
from xarray.tests import has_dask, requires_dask


# TODO can I subclass the chunkedduckarray protocol here?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Illviljan I hope I'm using all your cool duckarray type protocols correctly!

Comment on lines +99 to +107
try:
get_chunked_array_type(x)
except TypeError as e:
if str(e).startswith("Could not find a Chunk Manager which recognises type"):
return False
elif str(e) == "Expected a chunked array but none were found":
return False
else:
raise # something else went wrong
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a code smell, in which case has_chunkmanager, guess_chunkmanager, and get_chunked_array_type should be refactored.

@@ -183,7 +183,7 @@ def char_to_bytes(arr):
# can't make an S0 dtype
return np.zeros(arr.shape[:-1], dtype=np.bytes_)

if is_chunked_array(arr):
if is_chunked_array(arr) and has_chunkmanager(arr):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is_chunked_array(arr) and has_chunkmanager(arr) pattern becomes necessary because we are now considering the possibility that is_chunked_array(arr) == True but has_chunkmanager(arr) == False, whereas previously these were assumed to always be consistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@headtr1ck I got a notification saying you commented saying

But doesn't has_chunkmanager(arr) == True imply is_chunked_array(arr) == True?

(But I can't find your comment.)

It's a good question though. I think there are some array types that don't define a .chunks where you might still want to use other ChunkManager methods.

In particular JAX is interesting - it has a top-level pmap function which applies a function over multiple axes of an array similar to apply_gufunc. It distributes computation, but not over .chunks (which JAX doesn't define), instead over a global variable jax.local_device_count.

This is why I think we should rename ChunkManager to ComputeManager.

cc @alxmrs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@headtr1ck I got a notification saying you commented saying

But doesn't has_chunkmanager(arr) == True imply is_chunked_array(arr) == True?

(But I can't find your comment.)

It's a good question though. I think there are some array types that don't define a .chunks where you might still want to use other ChunkManager methods.

I came to the same conclusion, that's why I deleted the comment, sry.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries! I prefer to leave all my half-baked thoughts in the open and double or triple-post 😅 If you were wondering it then other people will definitely have the same question!

This is why I think we should rename ChunkManager to ComputeManager.

I could leave this to a second PR, to isolate the breaking changes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomNicholas FYI JAX does now support something a bit like chunking via sharding of jax.Array, there's a good summary here: https://jax.readthedocs.io/en/latest/notebooks/Distributed_arrays_and_automatic_parallelization.html
IIUC this is now preferred over pmap.

tomwhite added a commit to cubed-dev/cubed that referenced this pull request Aug 8, 2024
When rechunking with a dict that doesn't contain all axes, then the chunking
should be unchanged for those axes that are missing.

In particular, `a.rechunk({})` should be a no-op.

This is consistent with Dask (dask/dask#11261)
and Xarray (pydata/xarray#9286)
tomwhite added a commit to cubed-dev/cubed that referenced this pull request Aug 9, 2024
When rechunking with a dict that doesn't contain all axes, then the chunking
should be unchanged for those axes that are missing.

In particular, `a.rechunk({})` should be a no-op.

This is consistent with Dask (dask/dask#11261)
and Xarray (pydata/xarray#9286)
@TomNicholas TomNicholas removed the request for review from dcherian September 2, 2024 16:59
thodson-usgs pushed a commit to thodson-usgs/cubed that referenced this pull request Oct 24, 2024
When rechunking with a dict that doesn't contain all axes, then the chunking
should be unchanged for those axes that are missing.

In particular, `a.rechunk({})` should be a no-op.

This is consistent with Dask (dask/dask#11261)
and Xarray (pydata/xarray#9286)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
topic-chunked-arrays Managing different chunked backends, e.g. dask
Projects
None yet
Development

Successfully merging this pull request may close these issues.

A basic default ChunkManager for arrays that report their own chunks
5 participants