Skip to content

Commit

Permalink
apacheGH-34639: [C++] Support RecordBatch::FromStructArray even if st…
Browse files Browse the repository at this point in the history
…ruct array has nulls/offsets (apache#34691)

### Rationale for this change

A struct array can have a validity map and an offset.  A record batch cannot.  When converting from a struct array to a record batch we were throwing an error if a validity map was present and returning the wrong data if an offset was present.

### What changes are included in this PR?

If a validity map or offset are present then StructArray::Flatten is used to push the offset and validity map into the struct array.  Note: this means that RecordBatch::FromStructArray will not be zero-copy if it has to push down a validity map.

### Are these changes tested?

Yes

### Are there any user-facing changes?

Yes, RecordBatch::FromStructArray now takes in a memory pool because it might have to make allocations when pushing validity bitmaps down.

* Closes: apache#34639

Authored-by: Weston Pace <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
  • Loading branch information
westonpace authored and ArgusLi committed May 15, 2023
1 parent ad32a64 commit 75cd740
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 10 deletions.
5 changes: 3 additions & 2 deletions cpp/src/arrow/json/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ class DecodingOperator {

std::shared_ptr<ChunkedArray> chunked;
RETURN_NOT_OK(builder->Finish(&chunked));
ARROW_ASSIGN_OR_RAISE(auto batch, RecordBatch::FromStructArray(chunked->chunk(0)));
ARROW_ASSIGN_OR_RAISE(
auto batch, RecordBatch::FromStructArray(chunked->chunk(0), context_->pool()));

return DecodedBlock{std::move(batch), num_bytes};
}
Expand Down Expand Up @@ -552,7 +553,7 @@ Result<std::shared_ptr<RecordBatch>> ParseOne(ParseOptions options,
std::shared_ptr<ChunkedArray> converted_chunked;
RETURN_NOT_OK(builder->Finish(&converted_chunked));

return RecordBatch::FromStructArray(converted_chunked->chunk(0));
return RecordBatch::FromStructArray(converted_chunked->chunk(0), context.pool());
}

} // namespace json
Expand Down
14 changes: 10 additions & 4 deletions cpp/src/arrow/record_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,20 @@ Result<std::shared_ptr<RecordBatch>> RecordBatch::MakeEmpty(
}

Result<std::shared_ptr<RecordBatch>> RecordBatch::FromStructArray(
const std::shared_ptr<Array>& array) {
const std::shared_ptr<Array>& array, MemoryPool* memory_pool) {
if (array->type_id() != Type::STRUCT) {
return Status::TypeError("Cannot construct record batch from array of type ",
*array->type());
}
if (array->null_count() != 0) {
return Status::Invalid(
"Unable to construct record batch from a StructArray with non-zero nulls.");
if (array->null_count() != 0 || array->offset() != 0) {
// If the struct array has a validity map or offset we need to push those into
// the child arrays via Flatten since the RecordBatch doesn't have validity/offset
const std::shared_ptr<StructArray>& struct_array =
internal::checked_pointer_cast<StructArray>(array);
ARROW_ASSIGN_OR_RAISE(std::vector<std::shared_ptr<Array>> fields,
struct_array->Flatten(memory_pool));
return Make(arrow::schema(array->type()->fields()), array->length(),
std::move(fields));
}
return Make(arrow::schema(array->type()->fields()), array->length(),
array->data()->child_data);
Expand Down
12 changes: 9 additions & 3 deletions cpp/src/arrow/record_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,16 @@ class ARROW_EXPORT RecordBatch {
/// \brief Construct record batch from struct array
///
/// This constructs a record batch using the child arrays of the given
/// array, which must be a struct array. Note that the struct array's own
/// null bitmap is not reflected in the resulting record batch.
/// array, which must be a struct array.
///
/// \param[in] array the source array, must be a StructArray
/// \param[in] pool the memory pool to allocate new validity bitmaps
///
/// This operation will usually be zero-copy. However, if the struct array has an
/// offset or a validity bitmap then these will need to be pushed into the child arrays.
/// Pushing the offset is zero-copy but pushing the validity bitmap is not.
static Result<std::shared_ptr<RecordBatch>> FromStructArray(
const std::shared_ptr<Array>& array);
const std::shared_ptr<Array>& array, MemoryPool* pool = default_memory_pool());

/// \brief Determine if two record batches are exactly equal
///
Expand Down
19 changes: 18 additions & 1 deletion cpp/src/arrow/record_batch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,19 @@ TEST_F(TestRecordBatch, ToFromEmptyStructArray) {
ASSERT_TRUE(batch1->Equals(*batch2));
}

TEST_F(TestRecordBatch, FromSlicedStructArray) {
static constexpr int64_t kLength = 10;
std::shared_ptr<Array> x_arr = ArrayFromJSON(int64(), "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]");
StructArray struct_array(struct_({field("x", int64())}), kLength, {x_arr});
std::shared_ptr<Array> sliced = struct_array.Slice(5, 3);
ASSERT_OK_AND_ASSIGN(auto batch, RecordBatch::FromStructArray(sliced));

std::shared_ptr<Array> expected_arr = ArrayFromJSON(int64(), "[5, 6, 7]");
std::shared_ptr<RecordBatch> expected =
RecordBatch::Make(schema({field("x", int64())}), 3, {expected_arr});
AssertBatchesEqual(*expected, *batch);
}

TEST_F(TestRecordBatch, FromStructArrayInvalidType) {
random::RandomArrayGenerator gen(42);
ASSERT_RAISES(TypeError, RecordBatch::FromStructArray(gen.ArrayOf(int32(), 6)));
Expand All @@ -339,7 +352,11 @@ TEST_F(TestRecordBatch, FromStructArrayInvalidType) {
TEST_F(TestRecordBatch, FromStructArrayInvalidNullCount) {
auto struct_array =
ArrayFromJSON(struct_({field("f1", int32())}), R"([{"f1": 1}, null])");
ASSERT_RAISES(Invalid, RecordBatch::FromStructArray(struct_array));
ASSERT_OK_AND_ASSIGN(auto batch, RecordBatch::FromStructArray(struct_array));
std::shared_ptr<Array> expected_arr = ArrayFromJSON(int32(), "[1, null]");
std::shared_ptr<RecordBatch> expected =
RecordBatch::Make(schema({field("f1", int32())}), 2, {expected_arr});
AssertBatchesEqual(*expected, *batch);
}

TEST_F(TestRecordBatch, MakeEmpty) {
Expand Down

0 comments on commit 75cd740

Please sign in to comment.