Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support options in ak.merge_union_of_records #2236

Merged
merged 12 commits into from
Feb 14, 2023
3 changes: 3 additions & 0 deletions src/awkward/_nplikes/array_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ def regularize_index_for_length(
def nonzero(self, x: ArrayLike) -> tuple[ArrayLike, ...]:
return self._module.nonzero(x)

def where(self, condition: ArrayLike, x1: ArrayLike, x2: ArrayLike) -> ArrayLike:
return self._module.where(condition, x1, x2)

def unique_values(self, x: ArrayLike) -> ArrayLike:
return self._module.unique(
x,
Expand Down
4 changes: 4 additions & 0 deletions src/awkward/_nplikes/numpylike.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@ def reshape(
def nonzero(self, x: ArrayLike) -> tuple[ArrayLike, ...]:
...

@abstractmethod
def where(self, condition: ArrayLike, x1: ArrayLike, x2: ArrayLike) -> ArrayLike:
...

@abstractmethod
def unique_values(self, x: ArrayLike) -> ArrayLike:
...
Expand Down
9 changes: 8 additions & 1 deletion src/awkward/_nplikes/typetracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1031,9 +1031,16 @@ def nonzero(self, x: ArrayLike) -> tuple[TypeTracerArray, ...]:
try_touch_data(x)
return (TypeTracerArray._new(np.int64, (unknown_length,)),) * len(x.shape)

def where(
self, condition: ArrayLike, x1: ArrayLike, x2: ArrayLike
) -> TypeTracerArray:
condition, x1, x2 = self.broadcast_arrays(condition, x1, x2)
result_dtype = numpy.result_type(x1, x2)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: implement result_type for all nplikes. This coincides with the planned introduction of the metadata submodule

return TypeTracerArray._new(result_dtype, shape=condition.shape)

def unique_values(self, x: ArrayLike) -> TypeTracerArray:
try_touch_data(x)
return TypeTracerArray._new(x.dtype, shape=(None,))
return TypeTracerArray._new(x.dtype, shape=(unknown_length,))

def concat(self, arrays, *, axis: int | None = 0) -> TypeTracerArray:
if axis is None:
Expand Down
312 changes: 205 additions & 107 deletions src/awkward/operations/ak_merge_union_of_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


import awkward as ak
from awkward._nplikes.numpylike import NumpyMetadata
from awkward._nplikes.numpylike import ArrayLike, NumpyMetadata

np = NumpyMetadata.instance()
cpu = ak._backends.NumpyBackend.instance()
Expand Down Expand Up @@ -41,32 +41,102 @@ def _impl(array, axis, highlevel, behavior):
behavior = ak._util.behavior_of(array, behavior=behavior)
layout = ak.to_layout(array, allow_record=False)

def apply_displace_index(layout, backend, **kwargs):
if layout.is_record:
return layout
elif layout.is_option and layout.content.is_record:
raise ak._errors.wrap_error(
TypeError(
"optional records cannot be merged by this function. First call `ak.merge_option_of_records` "
"to convert these into records of options."
def invert_record_union(
tags: ArrayLike, index: ArrayLike, contents, *, backend
) -> ak.contents.RecordArray:
index_nplike = backend.index_nplike
# First, create an ordered list containing the union of all fields
seen_fields = set()
all_fields = []
for content in contents:
# Find new fields
for field in content.fields:
if field not in seen_fields:
seen_fields.add(field)
all_fields.append(field)

# Build unions for each field
outer_field_contents = []
for field in all_fields:
field_tags = index_nplike.asarray(tags, copy=True)
field_index = index_nplike.asarray(index, copy=True)

# Build contents for union representing current field
field_contents = [c.content(field) for c in contents if c.has_field(field)]

# Find the best location for option type.
# We will potentially have fewer contents in this per-field union
# than the original outer union-of-records, because some recordarrays
# may not have the given field.
tag_for_missing = 0
for i, content in enumerate(field_contents):
if content.is_option:
tag_for_missing = i
break

# If at least one recordarray doesn't have this field, we add
# a special option
if len(field_contents) < len(contents):
# Make the tagged content an option, growing by one to ensure we
# have a known `None` value to index into
tagged_content = field_contents[tag_for_missing]
indexedoption_index = backend.index_nplike.arange(
tagged_content.length + 1, dtype=np.int64
)
indexedoption_index[
index_nplike.shape_item_as_index(tagged_content.length)
] = -1
field_contents[
tag_for_missing
] = ak.contents.IndexedOptionArray.simplified(
ak.index.Index64(indexedoption_index), tagged_content
)

# Index of None values in tagged content (content with extra None item at end)
index_missing = index_nplike.shape_item_as_index(
field_contents[tag_for_missing].length - 1
)
elif layout.is_indexed and layout.content.is_record:
record = layout.content
# Transpose index-of-record to record-of-index
return ak.contents.RecordArray(
[
ak.contents.IndexedArray.simplified(
layout.index, c, parameters=layout._parameters
)
for c in record.contents
],
record.fields,
record.length,
backend=backend,
# Now build contents for union, by looping over outermost index
# Overwrite tags to adjust for new contents length
# and use the tagged content for any missing values
k = 0
for j, content in enumerate(contents):
tag_is_j = field_tags == j

if content.has_field(field):
# Rewrite tags to account for missing fields
field_tags[tag_is_j] = k
k += 1

else:
# Rewrite tags to point to option content
field_tags[tag_is_j] = tag_for_missing
# Point each value to missing value
field_index[tag_is_j] = index_missing

outer_field_contents.append(
ak.contents.UnionArray.simplified(
ak.index.Index8(field_tags),
ak.index.Index64(field_index),
field_contents,
)
)
else:
raise ak._errors.wrap_error(TypeError(layout))
return ak.contents.RecordArray(
outer_field_contents, all_fields, backend=backend
)

def compact_option_index(index: ArrayLike, *, backend) -> ArrayLike:
# Find dense (outer) index into non-null items.
# This is in trivial order: the re-arranging is done by the union (below)
is_none = index < 0
num_none = backend.index_nplike.count_nonzero(is_none)
dense_index = backend.index_nplike.empty(len(index), dtype=index.dtype)
dense_index[is_none] = -1
dense_index[~is_none] = backend.index_nplike.arange(
len(index) - num_none,
dtype=index.dtype,
)
return dense_index

def apply(layout, depth, backend, **kwargs):
posaxis = ak._util.maybe_posaxis(layout, axis, depth)
Expand All @@ -75,93 +145,121 @@ def apply(layout, depth, backend, **kwargs):
np.AxisError(f"axis={axis} exceeds the depth of this array ({depth})")
)
elif depth == posaxis + 1 and layout.is_union:
if all(x.is_record for x in layout.contents):
# First, find all ordered fields, regularising any index-of-record
# such that we have record-of-index
seen_fields = set()
all_fields = []
regularised_contents = []
for content in layout.contents:
# Ensure that we have record-of-index
regularised_content = ak._do.recursively_apply(
content, apply_displace_index
)
regularised_contents.append(regularised_content)

# Find new fields
for field in regularised_content.fields:
if field not in seen_fields:
seen_fields.add(field)
all_fields.append(field)

# Build unions for each field
outer_field_contents = []
for field in all_fields:
field_tags = backend.index_nplike.asarray(layout.tags, copy=True)
field_index = backend.index_nplike.asarray(layout.index, copy=True)

# Build contents for union representing current field
field_contents = [
c.content(field)
for c in regularised_contents
if c.has_field(field)
]

# Find the best location for option type.
# We will potentially have fewer contents in this per-field union
# than the original outer union-of-records, because some recordarrays
# may not have the given field.
tag_for_missing = 0
for i, content in enumerate(field_contents):
if not all(
x.is_record or x.is_indexed or x.is_option for x in layout.contents
):
return

# Any option types need to be re-written
if any(x.is_option for x in layout.contents):
# We'll create an outermost indexed-option type, which re-instates the missing values
outer_option_index = backend.index_nplike.arange(
layout.length, dtype=np.int64
)

# We'll rebuild the union to include only the non-null items.
inner_union_index_parts = []
next_contents = []
next_tags_sparse = backend.index_nplike.asarray(layout.tags, copy=True)
for tag, content in enumerate(layout.contents):
is_this_tag = backend.index_nplike.asarray(layout.tags) == tag

# Union arrays for this content
tag_index = backend.index_nplike.asarray(layout.index)[is_this_tag]

# For trivial partitions, we just include them as-is
if isinstance(content, ak.contents.UnmaskedArray):
next_contents.append(content.content)
inner_union_index_parts.append(tag_index)
elif content.is_option or content.is_indexed:
# Let's work with indexed option types for ease
if content.is_option:
tag_for_missing = i
break

# If at least one recordarray doesn't have this field, we add
# a special option
if len(field_contents) < len(regularised_contents):
# Make the tagged content an option, growing by one to ensure we
# have a known `None` value to index into
tagged_content = field_contents[tag_for_missing]
indexedoption_index = backend.index_nplike.arange(
tagged_content.length + 1, dtype=np.int64
)
indexedoption_index[tagged_content.length] = -1
field_contents[
tag_for_missing
] = ak.contents.IndexedOptionArray.simplified(
ak.index.Index64(indexedoption_index), tagged_content
content = content.to_IndexedOptionArray64()

# Now find the inner index that actually re-arranges the (non-null) items
content_index = backend.index_nplike.asarray(content.index)
merged_index = content_index[tag_index]
is_non_null = merged_index >= 0
inner_union_index_parts.append(merged_index[is_non_null])
# Mask out tags of items that are missing
next_tags_sparse[is_this_tag] = backend.index_nplike.where(
is_non_null, tag, -1
)

# Now build contents for union, by looping over outermost index
# Overwrite tags to adjust for new contents length
# and use the tagged content for any missing values
k = 0
for j, content in enumerate(regularised_contents):
tag_is_j = field_tags == j

if content.has_field(field):
# Rewrite tags to account for missing fields
field_tags[tag_is_j] = k
k += 1

else:
# Rewrite tags to point to option content
field_tags[tag_is_j] = tag_for_missing
# Point each value to missing value
field_index[tag_is_j] = (
field_contents[tag_for_missing].length - 1
)

outer_field_contents.append(
ak.contents.UnionArray.simplified(
ak.index.Index8(field_tags),
ak.index.Index64(field_index),
field_contents,
# Find dense index into non-null items of this content for the outer optiontype.
outer_option_index[is_this_tag] = backend.index_nplike.where(
is_non_null, outer_option_index[is_this_tag], -1
Copy link
Collaborator Author

@agoose77 agoose77 Feb 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something that might be better as a kernel (and perhaps we already have a kernel that is applicable here).

)
)
return ak.contents.RecordArray(
outer_field_contents, all_fields, backend=backend

# outer_index has same length as layout, so union index should align
# union items that are null are set to -1, and those that are not are densified
next_contents.append(content.content)
else:
next_contents.append(content)
inner_union_index_parts.append(tag_index)

outer_option_index = compact_option_index(
outer_option_index, backend=backend
)

# Find length of the new (dense) tags array of the inner union
total_length = 0
for array in inner_union_index_parts:
agoose77 marked this conversation as resolved.
Show resolved Hide resolved
total_length += array.size

# Ignore missing items for inner union, creating a dense array of tags
next_tags = next_tags_sparse[next_tags_sparse >= 0]
# Build dense index from parts for each tag
next_index = backend.index_nplike.empty(total_length, dtype=np.int64)
for tag, content_index in enumerate(inner_union_index_parts):
next_index[next_tags == tag] = content_index

# Return option around record of unions
return ak.contents.IndexedOptionArray(
ak.index.Index64(outer_option_index),
invert_record_union(
next_tags, next_index, next_contents, backend=backend
),
)

# Any index types need to be re-written
elif any(x.is_indexed for x in layout.contents):
# We'll create an outermost indexed-option type, which re-instates the missing values
current_index = backend.index_nplike.asarray(layout.index)
next_index = backend.index_nplike.empty(
current_index.size, dtype=np.int64
)

# We'll rebuild the union to include only the non-null items.
next_contents = []
for tag, content in enumerate(layout.contents):
is_this_tag = backend.index_nplike.asarray(layout.tags) == tag

# Rewrite union index of indexed types
if content.is_indexed:
content_index = backend.index_nplike.asarray(content.index)
next_index[is_this_tag] = current_index[is_this_tag][
agoose77 marked this conversation as resolved.
Show resolved Hide resolved
content_index
]
next_contents.append(content.content)

else:
next_index[is_this_tag] = current_index[is_this_tag]
next_contents.append(content)

return invert_record_union(
backend.index_nplike.asarray(layout.tags),
next_index,
next_contents,
backend=backend,
)

else:
return invert_record_union(
backend.index_nplike.asarray(layout.tags),
backend.index_nplike.asarray(layout.index),
layout.contents,
backend=backend,
)

out = ak._do.recursively_apply(layout, apply)
Expand Down
Loading