Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Map Dask Series to Dask Series #4872

Merged
merged 9 commits into from
Jun 17, 2019
Merged

Map Dask Series to Dask Series #4872

merged 9 commits into from
Jun 17, 2019

Conversation

bluecoconut
Copy link
Contributor

@bluecoconut bluecoconut commented Jun 2, 2019

  • Tests added / passed
  • Passes flake8 dask (yes for all changes, but 1 hanging flake8 error in dask/tests/test_utils.py)

Added an implementation that shards the base and map series and then combines them on chunks with the same key (index of the map series, value of the base series).

This scales in number of tasks like: O(N*M+M*M) where N is npartitions of base and M is npartitions of map series.

Addresses: #4870

Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably misremembering something about pandas' Series.map, but does the following alternative implementation work?

In [65]: def map2(self, s):
    ...:     return dd.map_partitions(lambda a, b: a.reindex(b.values), self, s)
    ...:
    ...:

In [66]: s = pd.Series([1, 2])

In [67]: ds = dd.from_pandas(s, 2)

In [68]: map2(ds, ds).compute()
Out[68]:
1    2.0
2    NaN
dtype: float64

@pytest.mark.parametrize("map_npart", [1, 3])
@pytest.mark.parametrize("sorted_index", [False, True])
def test_series_map(base_npart, map_npart, sorted_index):
np.random.seed(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? I'd be concerned if the test only passed for specific rngs :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, good point. I usually include for deterministic hashing in the case someone extends tests based on values produced by "generators". Works just fine in this case without, so will remove.

@@ -2275,6 +2275,8 @@ def isin(self, values):
@insert_meta_param_description(pad=12)
@derived_from(pd.Series)
def map(self, arg, na_action=None, meta=no_default):
if is_series_like(arg) and is_dask_collection(arg):
return series_map(self, arg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should meta be passed through here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. For this operation the output meta is known always since we have meta on both the series that are coming in. (Ex. the output will have the index of the base, and the data type of the map series)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we should perhaps error if the user provides metadata that doesn't match our expected meta then. I don't like silently ignoring arguments.

The difficult case is Series[int].map(Series[int]), since the missing values may cast to float..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For matching metadata: is there a method already around for managing this? Taking 2 meta and checking they agree?

Ah, for that difficult case is there a planned strategy? I have noticed the case that this happens already in dask where meta is not strictly enforced on computation, especially with the null ints becoming floats. Is there something we should do here to capture that, or just match behavior of other methods that return their "usual intended" (eg. in this case, it's expected to be int, but may come out otherwise in execution).

@bluecoconut
Copy link
Contributor Author

For the map2 that's defined above: it would only under some very specific partitioning schemes:

  1. partitions are aligned (same number of partitions) (unless im misunderstanding how map_partitions does broadcasting.
  2. Values in the "base" series match up with values in the "map" series on each of the same partitions

The implementation I included had to do a shard+shuffle (based on hash of value column in the base and the index in the map) to ensure alignment of the local map, and then concat and re-index based on the input to match the partition scheme of the input base. (The output actually has the exact same distributed index as the input base, regardless of npartitions on the map series)

Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the communication patter like here? Does every partition of self see every partition of arg? I think it has to, right?

@@ -2275,6 +2275,8 @@ def isin(self, values):
@insert_meta_param_description(pad=12)
@derived_from(pd.Series)
def map(self, arg, na_action=None, meta=no_default):
if is_series_like(arg) and is_dask_collection(arg):
return series_map(self, arg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we should perhaps error if the user provides metadata that doesn't match our expected meta then. I don't like silently ignoring arguments.

The difficult case is Series[int].map(Series[int]), since the missing values may cast to float..

np.random.shuffle(index)
base.index = index
map_index = [''.join(x) for x in product('abc', repeat=3)]
map = pd.Series(np.random.randint(50, size=len(map_index)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map is a builtin, so lets call this mapper.

And pass index=map_index rather than assigning next.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

map_index = [''.join(x) for x in product('abc', repeat=3)]
map = pd.Series(np.random.randint(50, size=len(map_index)))
map.index = map_index
pandas_answer = base.map(map)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this expected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

pandas_answer = base.map(map)
dask_base = dd.from_pandas(base, npartitions=base_npart)
dask_map = dd.from_pandas(map, npartitions=map_npart)
dask_answer = dask_base.map(dask_map).compute()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call the result and don't .compute. assert_eq will call compute.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome~ I didn't know that

dask_base = dd.from_pandas(base, npartitions=base_npart)
dask_map = dd.from_pandas(map, npartitions=map_npart)
dask_answer = dask_base.map(dask_map).compute()
dd.utils.assert_eq(pandas_answer.sort_index(), dask_answer.sort_index())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the need for sort_index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no need, removed.



def mapseries(base_chunk, concat_map):
return base_chunk.map(concat_map.sort_index())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the sort_index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the concat_map will not come in sorted, and pandas map operation errors if the index of the mapping series is not sorted. This ensures it's sorted so pandas map is happy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pandas map operation errors if the index of the mapping series is not sorted.

Can you clarify. IIUC, that's not correct

In [9]: s = pd.Series([3, 2, 1], index=[1, 0, 2])

In [10]: t = pd.Series([2, 1, 3], index=['b', 'a', 'c'])

In [11]: t.map(s)
Out[11]:
b    1.0
a    3.0
c    NaN
dtype: float64

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes you are right. I was miss-remembering why I added this exactly apparently, sorry for being sloppy and quick in my response.

The error came from applying the map where the mapper was the result of a groupby splitout=N from some operations in dask (not sure which ones anymore). The resulting column (on the partition) if doing df.partitions[0].compute() resulted in a df that was unique index and locally sorted, however, calling df.partitions[0].compute().index.is_unique returned false, thus raising an error. However, by simply sort_index it seemed to cause the index to re-calculate it's own "cached" is_unique value, thus passing the check (as it should).

The error that I would get was: https://github.com/pandas-dev/pandas/blob/cb00deb94500205fcb27a33cc1d0df79a9727f8b/pandas/core/indexes/base.py#L2740

It's worth noting that I also (in a previous iteration) would always call .sort_index() on the dask DF mapping = mapping.map_partitions(pd.Series.sort_index), and that still didn't seem to reset this index bug, and had to do it immediately before applying the final map inside of the mapping function (where the .index.is_unique would incorrectly state False when in reality it was unique).

The other implementation I have includes doing:

    if not map_series.index.is_unique:
        # TODO: Figure out why I have to add this. This feels like a definite bug in dask <-> pandas
        map_series = map_series.sort_index()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All in all: The tests here pass at the moment without .sort_index, and I can't seem to be able to reproduce this odd is_unique bug, but I suspect it might come back.

so, for now: I removed the .sort_index() and added some tests to the series_map that shuffles the index of the mapper as well.

@bluecoconut
Copy link
Contributor Author

bluecoconut commented Jun 3, 2019

What's the communication patter like here? Does every partition of self see every partition of arg? I think it has to, right?

Communication pattern is:

  • Base(self) has N partitions that are then split to N*M shards based on hash(self.values)%M (where M is the partitions in map(arg)).
  • Map(arg) has M partitions that are split into M*M shards based on hash(arg.index)%M.
  • A concat step brings the M*M map shards into a M "map" partitions
  • Each of the M map partitions are mapped onto each of the N matching N*M base shards they align to.
  • Then, the output of the combine is combined back to N partitions, and re-indexed to match the original base(self) index.

So, no: not every partition of self has to see every partition of arg fully, as we can use the hash and shuffle/split to allign them based on values in buckets. However, yes, it's possible for values from any input partition to be mapped to by any map partition.

@jakirkham
Copy link
Member

@TomAugspurger, would you have a chance to take another look?

@TomAugspurger
Copy link
Member

Waiting to hear about #4872 (comment).

@bluecoconut
Copy link
Contributor Author

@TomAugspurger Answered and responded!

Removed the .sort_index() and will spend some time looking to cleanly reproduce the error I saw in the past.

@mrocklin
Copy link
Member

mrocklin commented Jun 8, 2019

This scales in number of tasks like: O(NM+MM) where N is npartitions of base and M is npartitions of map series.

What should we do when N, M are in the thousands? Repartition? Raise with NotImplementedError and encourage the user to repartition? Use the set_index/shuffle code to move things around in stages?

@jcrist
Copy link
Member

jcrist commented Jun 14, 2019

@bluecoconut, what's the status of this? Any thoughts on @mrocklin's comment above ^^^? Handling large scale cases (especially in the presence of naive use) is something we need to think about.

@bluecoconut
Copy link
Contributor Author

Update from my end largely is that I don't have any clear updates or additional time to chase adding and properly benchmarking new implementations to this for now.

In terms of scaling performance: I have run this many times over >2 TB series in both base and map side and it ran (on over 100 workers) in ~10 minutes without increasing memory pressure beyond the data scale that was persisted, just requiring swaps from worker to worker.

Towards automatic optimization questions: warning, erroring, or doing automatic partitioning: I'm not sure I understand the ownership of optimization: if every individual method tries to optimize, repartition, etc. just to work, full operational graphs will want to override this behavior for optimal flow without lock-stepping, right? It feels like it's optimizing for "the operation" and not the "system", and leads down a path of lots of bespoke oddities in behavior. In order to fight that, this method would then need hyper-parameters added for tuning of behavior.

In answer to the set_index/shuffle code: there are two separate sets of shuffling that are happening, so in raw optimization for number of swaps, it seems like running the staged-shuffling would help optimize number of swaps on both the base shuffle and the map shuffle. This comes at an increase in number of tasks, and some of the later swaps get larger in data transfer size for single objects, so it might act like a bit of a bottleneck? I don't know without benchmarking it thoroughly, and i'd suspect it'd result in needing to pass in max_branch_base and max_branch_map for tunability of performance (again more hyper-parameters of optimization). Also, since the the serialized objects in rearrange_by_column_tasks seem to require passing along the "partition index", this actually ends up being quite expensive in terms of memory use for these TB scale single "series" objects, effectively doubling the data that is being swapped around if the dtype is the same (int64). All in all, while I see a slight benefit for actual number of swaps being done, the total bandwidth of data moved around during the swaps actually seems higher (unless I'm misunderstanding something) and the total number of tasks is going to increase.

All in all, I feel like a lot of the optimizations that could be done is unclear in the best direction to take. It really depends on the physical topology of the cluster and the scale and type of the data that is being shuffled to determine what the correct defaults really are. In terms of catching naive use: I'm not sure what the precedent in most of dask is, but I feel like it shows up enough in documentation that having large numbers of partitions can create graphs with ~N**2 number of tasks and that's something to be mindful of in general.

Also, in terms of actual raw performance, operations like this depend highly on the cluster topology, and its much more reliant on data locality and scheduling work on the correct nodes. I'm not sure how much optimization is done in the scheduler to handle this, but it feels like addressing scale for operations like this one live better in the graph optimization and scheduler. eg. to help with the graph optimization: I include tokenization that is unique to the partitioning of the base and the map series, so that the graph can re-use the partitioned result if the larger execution graph maps the same map series to multiple different base series.

Last, in follow up to the .sort_index() oddity inside of the mapseries method, in our deployment that includes this code, I had to add it back in because we immediately ran into the index problem. I was only able to chase it as far as the dask _concat methods (and could not make a reproducible example with auto-generated data, only private data so far). What happens is even if I have two series that are both unique in their index, and unique after combining: after concat, calling index.is_unique on the pandas object that is created returns False, and to "reset" the cached answer, i have to call .sort_index(), which then keeps the exact same length pandas object, but it then returns True for the is_unique property. Across an entire series that was split and concat back together, not all partitions have invalid is_unique cache values, but some do (seems random). And I also found different behavior if i made a _concat as a full "top level" dask task, vs. if i included in in the arguments eg. (_concat, [obj1, obj2]) vs (operation, (_concat, [obj1, obj2])). (Also of note: I'm using Int64 based index, and not sure if dtype in pandas matters for is_unique cache stuff). I'm not sure if any of you have any experience with this or idea of why this could happen.

@jcrist
Copy link
Member

jcrist commented Jun 17, 2019

Ok. Thanks @bluecoconut. I'm fine merging this as is, thanks for your contribution!

@jcrist jcrist merged commit 255cc5b into dask:master Jun 17, 2019
TomAugspurger added a commit to TomAugspurger/dask that referenced this pull request Jun 17, 2019
commit 255cc5b
Author: Justin Waugh <[email protected]>
Date:   Mon Jun 17 08:18:26 2019 -0600

    Map Dask Series to Dask Series (dask#4872)

    * index-test needed fix

    * single-parititon-error

    * added code to make it work

    * add tests

    * delete some comments

    * remove seed set

    * updated tests

    * remove sort_index and add tests

commit f7d73f8
Author: Matthew Rocklin <[email protected]>
Date:   Mon Jun 17 15:22:35 2019 +0200

    Further relax Array meta checks for Xarray (dask#4944)

    Our checks in slicing were causing issues for Xarray, which has some
    unslicable array types.  Additionally, this centralizes a bit of logic
    from blockwise into meta_from_array

    * simplify slicing meta code with meta_from_array

commit 4f97be6
Author: Peter Andreas Entschev <[email protected]>
Date:   Mon Jun 17 15:21:15 2019 +0200

    Expand *_like_safe usage (dask#4946)

commit abe9e28
Author: Peter Andreas Entschev <[email protected]>
Date:   Mon Jun 17 15:19:24 2019 +0200

    Defer order/casting einsum parameters to NumPy implementation (dask#4914)

commit 76f55fd
Author: Matthew Rocklin <[email protected]>
Date:   Mon Jun 17 09:28:07 2019 +0200

    Remove numpy warning in moment calculation (dask#4921)

    Previously we would divide by 0 in meta calculations for dask array
    moments, which would raise a Numpy RuntimeWarning to users.

    Now we avoid that situation, though we may also want to investigate a
    more thorough solution.

commit c437e63
Author: Matthew Rocklin <[email protected]>
Date:   Sun Jun 16 10:42:16 2019 +0200

    Fix meta_from_array to support Xarray test suite (dask#4938)

    Fixes pydata/xarray#3009

commit d8ff4c4
Author: jakirkham <[email protected]>
Date:   Fri Jun 14 10:35:00 2019 -0400

    Add a diagnostics extra (includes bokeh) (dask#4924)

    * Add a diagnostics extra (includes bokeh)

    * Bump bokeh minimum to 0.13.0

    * Add to `test_imports`

commit 773f775
Author: btw08 <[email protected]>
Date:   Fri Jun 14 14:34:34 2019 +0000

    4809 fix extra cr (dask#4935)

    * added test that fails to demonstrate the issue in 4809

    * modfied open_files/OpenFile to accept a newline parameter, similar to io.TextIOWrapper or the builtin open on py3. Pass newline='' to open_files when preparing to write csv files.

    Fixed dask#4809

    * modified newline documentation to follow convention

    * added blank line to make test_csv.py flake8-compliant

commit 419d27e
Author: Peter Andreas Entschev <[email protected]>
Date:   Fri Jun 14 15:18:42 2019 +0200

    Minor meta construction cleanup in concatenate (dask#4937)

commit 1f821f4
Author: Bruce Merry <[email protected]>
Date:   Fri Jun 14 12:49:59 2019 +0200

    Cache chunk boundaries for integer slicing (dask#4923)

    This is an alternative to dask#4909, to implement dask#4867.

    Instead of caching in the class as in dask#4909, use functools.lru_cache.
    This unfortunately has a fixed cache size rather than a cache entry
    stored with each array, but simplifies the code as it is not necessary
    to pass the cached value from the Array class down through the call tree
    to the point of use.

    A quick benchmark shows that the result for indexing a single value from
    a large array is similar to that from dask#4909, i.e., around 10x faster for
    constructing the graph.

    This only applies the cache in `_slice_1d`, so should be considered a
    proof-of-concept.

    * Move cached_cumsum to dask/array/slicing.py

    It can't go in dask/utils.py because the top level is not supposed to
    depend on numpy.

    * cached_cumsum: index cache by both id and hash

    The underlying _cumsum is first called with _HashIdWrapper, which will
    hit (very cheaply) if we've seen this tuple object before. If not, it
    will call itself again without the wrapper, which will hit (but at a
    higher cost for tuple.__hash__) if we've seen the same value before but
    in a different tuple object.

    * Apply cached_cumsum in more places

commit 66531ba
Author: jakirkham <[email protected]>
Date:   Thu Jun 13 12:13:55 2019 -0400

    Drop size 0 arrays in concatenate (dask#4167)

    * Test `da.concatenate` with size 0 array

    Make sure that `da.concatenate` does not include empty arrays in the
    result as they don't contribute any data.

    * Drop size 0 arrays from `da.concatenate`

    If any of the arrays passed to `da.concatenate` has a size of 0, then it
    won't contribute anything to the array created by concatenation. As such
    make sure to drop any size 0 arrays from the sequence of arrays to
    concatenate before proceeding.

    * Handle dtype and all 0 size case

    * Cast inputs with asarray

    * Coerce all arrays to concatenate to the same type

    * Drop obsoleted type handling code

    * Comment on why arrays are being dropped

    * Use `np.promote_types` for parity w/old behavior

    * Handle endianness during type promotion

    * Construct empty array of right type

    Avoids the need to cast later and the addition of another node to the
    graph.

    * Promote types in `concatenate` using `_meta`

    There was some left over type promotion code for the arrays to
    concatenate using their `dtype`s. However this should now use the
    `_meta` information instead since that is available.

    * Ensure `concatenate` is working on Dask Arrays

    * Raise `ValueError` if `concatenate` gets no arrays

    NumPy will raise if no arrays are provided to concatenate as it is
    unclear what to do. This adds a similar exception for Dask Arrays. Also
    this short circuits handling unusual cases later. Plus raises a clearer
    exception than one might see if this weren't raised.

    * Test `concatenate` raises when no arrays are given

    * Determine the concatenated array's shape

    Needed to handle the case where all arrays have trivial shapes.

    * Handle special sequence cases together

    * Update dask/array/core.py

    Co-Authored-By: James Bourbeau <[email protected]>

    * Drop outdated comment

    * Assume valid `_meta` in `concatenate`

    Simplifies the `_meta` handling logic in `concatenate` to assume that
    `_meta` is valid. As all arguments have been coerced to Dask Arrays,
    this is a reasonable assumption to make.

commit 46aef58
Author: James Bourbeau <[email protected]>
Date:   Thu Jun 13 11:04:47 2019 -0500

    Overload HLG values method (dask#4918)

    * Overload HLG values method

    * Return lists for keys, values, and items

    * Add tests for keys and items

commit f9cd802
Author: mcsoini <[email protected]>
Date:   Thu Jun 13 18:03:55 2019 +0200

    Merge dtype warning (dask#4917)

    * add test covering the merge column dtype mismatch warning

    * for various merge types: checks that the resulting dataframe
      has either no nans or that a UserWarning has been thrown

    * Add warning for mismatches between column data types

    * fixes issue dask#4574
    * Warning is thrown if the on-columns of left and right have
      different dtypes

    * flake8 fixes

    * fixes

    * use asciitable for warning string

commit c400691
Author: Hugo <[email protected]>
Date:   Thu Jun 13 17:38:37 2019 +0300

    Docs: Drop support for Python 2.7 (dask#4932)

commit 985cdf2
Author: Benjamin Zaitlen <[email protected]>
Date:   Thu Jun 13 10:38:15 2019 -0400

    Groupby Covariance/Correlation (dask#4889)

commit 6e8c1b7
Author: Jim Crist <[email protected]>
Date:   Wed Jun 12 15:55:11 2019 -0500

    Drop Python 2.7 (dask#4919)

    * Drop Python 2.7

    Drops Python 2.7 from our `setup.py`, and from our test matrix. We don't
    drop any of the compatability fixes (yet), but won't be adding new ones.

    * fixup

commit 7a9cfaf
Author: Ian Bolliger <[email protected]>
Date:   Wed Jun 12 11:44:26 2019 -0700

    keep index name with to_datetime (dask#4905)

    * keep index name with to_datetime

    * allow users to pass meta

    * Update dask/dataframe/core.py

    put meta as explicit kwarg

    Co-Authored-By: Matthew Rocklin <[email protected]>

    * Update dask/dataframe/core.py

    remove meta kwargs.pop

    Co-Authored-By: Matthew Rocklin <[email protected]>

    * remove test for index

    * allow index

commit abc86d3
Author: jakirkham <[email protected]>
Date:   Wed Jun 12 14:20:59 2019 -0400

    Raise ValueError if concatenate is given no arrays (dask#4927)

    * Raise `ValueError` if `concatenate` gets no arrays

    NumPy will raise if no arrays are provided to concatenate as it is
    unclear what to do. This adds a similar exception for Dask Arrays. Also
    this short circuits handling unusual cases later. Plus raises a clearer
    exception than one might see if this weren't raised.

    * Test `concatenate` raises when no arrays are given

commit ce2f866
Author: jakirkham <[email protected]>
Date:   Wed Jun 12 14:09:35 2019 -0400

    Promote types in `concatenate` using `_meta` (dask#4925)

    * Promote types in `concatenate` using `_meta`

    There was some left over type promotion code for the arrays to
    concatenate using their `dtype`s. However this should now use the
    `_meta` information instead since that is available.

    * Ensure `concatenate` is working on Dask Arrays
Merge remote-tracking branch 'upstream/master' into dataframe-warnings
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants