Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43994: [C++][Parquet] Fix schema conversion from two-level encoding nested list #43995

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4093,6 +4093,82 @@ TEST(TestArrowReaderAdHoc, OldDataPageV2) {
TryReadDataFile(path);
}

TEST(TestArrowReaderAdHoc, LegacyTwoLevelList) {
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
auto VerifyData = [](std::unique_ptr<ParquetFileReader> file_reader) {
// Expected Parquet schema of legacy two-level encoding
constexpr std::string_view kExpectedLegacyList =
"required group field_id=-1 a (List) {\n"
" repeated group field_id=-1 array (List) {\n"
" repeated int32 field_id=-1 array;\n"
" }\n"
"}\n";

// Expected Arrow schema and data
auto arrow_inner_list =
field("array", list(field("array", ::arrow::int32(), /*nullable=*/false)),
/*nullable=*/false);
auto arrow_outer_list = list(arrow_inner_list);
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
auto arrow_schema =
::arrow::schema({field("a", arrow_outer_list, /*nullable=*/false)});
auto expected_table = TableFromJSON(arrow_schema, {R"([[[[1,2],[3,4]]]])"});

// Verify Parquet schema
auto root_group = file_reader->metadata()->schema()->group_node();
ASSERT_EQ(1, root_group->field_count());
std::stringstream nodeStr;
PrintSchema(root_group->field(0).get(), nodeStr);
ASSERT_EQ(kExpectedLegacyList, nodeStr.str());

// Verify Arrow schema and data
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(
FileReader::Make(default_memory_pool(), std::move(file_reader), &reader));
std::shared_ptr<Table> table;
ASSERT_OK(reader->ReadTable(&table));
AssertTablesEqual(*expected_table, *table);
};

// Round-trip test for Parquet C++ reader and writer
// {
// // Create Parquet schema of legacy two-level encoding
// auto inner_list = GroupNode::Make("array", Repetition::REPEATED,
// {schema::Int32("array", Repetition::REPEATED)},
// LogicalType::List());
// auto outer_list =
// GroupNode::Make("a", Repetition::REQUIRED, {inner_list}, LogicalType::List());
// auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, {outer_list});
//
// // Create a Parquet writer to write values of nested list
// auto sink = CreateOutputStream();
// auto file_writer =
// ParquetFileWriter::Open(sink,
// std::dynamic_pointer_cast<GroupNode>(schema_node));
// auto row_group_writer = file_writer->AppendRowGroup();
// auto int_writer = dynamic_cast<Int32Writer*>(row_group_writer->NextColumn());
// ASSERT_TRUE(int_writer != nullptr);
//
// // Directly write a single row of nested list: [[1, 2],[3, 4]]
// constexpr int64_t kNumValues = 4;
// constexpr std::array<int16_t, kNumValues> kRepLevels = {0, 2, 1, 2};
// constexpr std::array<int16_t, kNumValues> kDefLevels = {2, 2, 2, 2};
// constexpr std::array<int32_t, kNumValues> kValues = {1, 2, 3, 4};
// int_writer->WriteBatch(kNumValues, kDefLevels.data(), kRepLevels.data(),
// kValues.data());
// file_writer->Close();
// ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
//
// // Read schema and verify it applies two-level encoding of list type
// ASSERT_NO_FATAL_FAILURE(
// VerifyData(ParquetFileReader::Open(std::make_shared<BufferReader>(buffer))));
// }

// Interoperability test for Parquet file generated by parquet-java
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
{
auto path = std::string(test::get_data_dir()) + "/old_list_structure.parquet";
ASSERT_NO_FATAL_FAILURE(VerifyData(ParquetFileReader::OpenFile(path)));
}
}

class TestArrowReaderAdHocSparkAndHvr
: public ::testing::TestWithParam<
std::tuple<std::string, std::shared_ptr<DataType>>> {};
Expand Down
111 changes: 110 additions & 1 deletion cpp/src/parquet/arrow/arrow_schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <memory>
#include <vector>

#include "gmock/gmock-matchers.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"

Expand Down Expand Up @@ -601,6 +602,58 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
arrow_fields.push_back(::arrow::field("name", arrow_list, false));
}

// Two-level encoding List<List<Integer>>:
// optional group my_list (LIST) {
// repeated group array (LIST) {
// repeated int32 array;
// }
// }
{
auto inner_array =
PrimitiveNode::Make("array", Repetition::REPEATED, ParquetType::INT32);
auto outer_array = GroupNode::Make("array", Repetition::REPEATED, {inner_array},
ConvertedType::LIST);
parquet_fields.push_back(GroupNode::Make("my_list", Repetition::OPTIONAL,
{outer_array}, ConvertedType::LIST));
auto arrow_inner_array = ::arrow::field("array", INT32, /*nullable=*/false);
auto arrow_outer_array =
::arrow::field("array", ::arrow::list(arrow_inner_array), /*nullable=*/false);
auto arrow_list = ::arrow::list(arrow_outer_array);
arrow_fields.push_back(::arrow::field("my_list", arrow_list, true));
}

// List<Map<String, String>> in three-level list encoding:
// optional group my_list (LIST) {
// repeated group list {
// required group element (MAP) {
// repeated group key_value {
// required binary key (STRING);
// optional binary value (STRING);
// }
// }
// }
// }
{
auto key = PrimitiveNode::Make("key", Repetition::REQUIRED, ParquetType::BYTE_ARRAY,
ConvertedType::UTF8);
auto value = PrimitiveNode::Make("value", Repetition::OPTIONAL,
ParquetType::BYTE_ARRAY, ConvertedType::UTF8);
auto key_value = GroupNode::Make("key_value", Repetition::REPEATED, {key, value});
auto element =
GroupNode::Make("element", Repetition::REQUIRED, {key_value}, ConvertedType::MAP);
auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::OPTIONAL, {list}, ConvertedType::LIST));

auto arrow_key = ::arrow::field("key", UTF8, /*nullable=*/false);
auto arrow_value = ::arrow::field("value", UTF8, /*nullable=*/true);
auto arrow_element = ::arrow::field(
"element", std::make_shared<::arrow::MapType>(arrow_key, arrow_value),
/*nullable=*/false);
auto arrow_list = ::arrow::list(arrow_element);
arrow_fields.push_back(::arrow::field("my_list", arrow_list, /*nullable=*/true));
}

auto arrow_schema = ::arrow::schema(arrow_fields);
ASSERT_OK(ConvertSchema(parquet_fields));

Expand Down Expand Up @@ -727,6 +780,60 @@ TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) {
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
}

TEST_F(TestConvertParquetSchema, IllegalParquetNestedSchema) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case verifies that three-level list and map cannot be nested in a legacy two-level list.

// List<Map<String, String>> in two-level list encoding:
//
// optional group my_list (LIST) {
// repeated group array (MAP) {
// repeated group key_value {
// required binary key (STRING);
// optional binary value (STRING);
// }
// }
// }
{
auto key = PrimitiveNode::Make("key", Repetition::REQUIRED, ParquetType::BYTE_ARRAY,
ConvertedType::UTF8);
auto value = PrimitiveNode::Make("value", Repetition::OPTIONAL,
ParquetType::BYTE_ARRAY, ConvertedType::UTF8);
auto key_value = GroupNode::Make("key_value", Repetition::REPEATED, {key, value});
auto array =
GroupNode::Make("array", Repetition::REPEATED, {key_value}, ConvertedType::MAP);
std::vector<NodePtr> parquet_fields;
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::OPTIONAL, {array}, ConvertedType::LIST));

EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid,
testing::HasSubstr("Group with one repeated child must be LIST-annotated."),
ConvertSchema(parquet_fields));
}

// List<List<String>>: outer list is two-level encoding, inner list is three-level
//
// optional group my_list (LIST) {
// repeated group array (LIST) {
// repeated group list {
// required binary element (STRING);
// }
// }
// }
{
auto element = PrimitiveNode::Make("element", Repetition::REQUIRED,
ParquetType::BYTE_ARRAY, ConvertedType::UTF8);
auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
auto array =
GroupNode::Make("array", Repetition::REPEATED, {list}, ConvertedType::LIST);
std::vector<NodePtr> parquet_fields;
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::OPTIONAL, {array}, ConvertedType::LIST));

EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, testing::HasSubstr("LIST-annotated groups must not be repeated."),
ConvertSchema(parquet_fields));
}
}

Status ArrowSchemaToParquetMetadata(std::shared_ptr<::arrow::Schema>& arrow_schema,
std::shared_ptr<KeyValueMetadata>& metadata) {
ARROW_ASSIGN_OR_RAISE(
Expand Down Expand Up @@ -1846,7 +1953,9 @@ TEST_F(TestLevels, ListErrors) {
{
::arrow::Status error = MaybeSetParquetSchema(GroupNode::Make(
"child_list", Repetition::REPEATED,
{PrimitiveNode::Make("bool", Repetition::REPEATED, ParquetType::BOOLEAN)},
{GroupNode::Make("list", Repetition::REPEATED,
{PrimitiveNode::Make("element", Repetition::REQUIRED,
ParquetType::BOOLEAN)})},
LogicalType::List()));
ASSERT_RAISES(Invalid, error);
std::string expected("LIST-annotated groups must not be repeated.");
Expand Down
103 changes: 67 additions & 36 deletions cpp/src/parquet/arrow/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -513,11 +513,13 @@ Status PopulateLeaf(int column_index, const std::shared_ptr<Field>& field,
}

// Special case mentioned in the format spec:
// If the name is array or ends in _tuple, this should be a list of struct
// even for single child elements.
bool HasStructListName(const GroupNode& node) {
// If the name is array or uses the parent's name with `_tuple` appended,
// this should be:
// - a list of list or map type if the repeated group node is LIST- or MAP-annotated.
// - otherwise, a list of struct even for single child elements.
bool HasListElementName(const GroupNode& node, const GroupNode& parent) {
::std::string_view name{node.name()};
return name == "array" || EndsWith(name, "_tuple");
return name == "array" || name == (parent.name() + "_tuple");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield Fix the matching of _tuple to follow the spec.

}

Status GroupToStruct(const GroupNode& node, LevelInfo current_levels,
Expand Down Expand Up @@ -598,9 +600,9 @@ Status MapToSchemaField(const GroupNode& group, LevelInfo current_levels,
ctx->LinkParent(value_field, key_value_field);

// required/optional group name=whatever {
// repeated group name=key_values{
// repeated group name=key_values {
// required TYPE key;
// required/optional TYPE value;
// required/optional TYPE value;
// }
// }
//
Expand Down Expand Up @@ -634,6 +636,7 @@ Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels,
if (group.is_repeated()) {
return Status::Invalid("LIST-annotated groups must not be repeated.");
}

current_levels.Increment(group);

out->children.resize(group.field_count());
Expand All @@ -651,45 +654,73 @@ Status ListToSchemaField(const GroupNode& group, LevelInfo current_levels,

int16_t repeated_ancestor_def_level = current_levels.IncrementRepeated();
if (list_node.is_group()) {
// Resolve 3-level encoding
//
// required/optional group name=whatever {
// repeated group name=list {
// required/optional TYPE item;
// }
// }
//
// yields list<item: TYPE ?nullable> ?nullable
//
// We distinguish the special case that we have
//
// required/optional group name=whatever {
// repeated group name=array or $SOMETHING_tuple {
// required/optional TYPE item;
// }
// }
//
// In this latter case, the inner type of the list should be a struct
// rather than a primitive value
//
// yields list<item: struct<item: TYPE ?nullable> not null> ?nullable
const auto& list_group = static_cast<const GroupNode&>(list_node);
// Special case mentioned in the format spec:
// If the name is array or ends in _tuple, this should be a list of struct
// even for single child elements.
if (list_group.field_count() == 1 && !HasStructListName(list_group)) {
// List of primitive type
RETURN_NOT_OK(
NodeToSchemaField(*list_group.field(0), current_levels, ctx, out, child_field));
} else {
if (list_group.field_count() > 1) {
// The inner type of the list should be a struct when there are multiple fields
// in the repeated group
RETURN_NOT_OK(GroupToStruct(list_group, current_levels, ctx, out, child_field));
} else if (list_group.field_count() == 1) {
const auto& repeated_field = list_group.field(0);
if (repeated_field->is_repeated()) {
Copy link
Contributor

@emkornfield emkornfield Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the careful explanation on my last questions, after rereading I think I mostly agree that this is a bug that needs to be fixed. I think I missed the second (LIST) annotation. Which even though this check corresponds to the java, code, it seems an important factor here is that the list_group is in fact a (LIST and maybe even possibly a map), not that the inner element is repeated?

So I think the logic might make more sense as (pseudocode):

if (list_group.field_count() > 1) {
   ...
} else if (HasListElementName(list_group, group)) {
   if (IsMap(list_group)) {
           RETURN_NOT_OK(
            ListToMapField(*list_group, current_levels, ctx, out, child_field));
      
   } else if IsList(list_group)) {
      RETURN_NOT_OK(
            ListToSchemaField(*list_group, current_levels, ctx, out, child_field));
  } else {
      RETURN_NOT_OK(GroupToStruct(list_group, current_levels, ctx, out, child_field));
  }
} else {
  ...
}

Does this formulation work?

Copy link
Member Author

@wgtmac wgtmac Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this does not work. It has two issues:

// Special case where the inner type might be a list with two-level encoding
// like below:
//
// required/optional group name=SOMETHING (LIST) {
// repeated group array (LIST) {
// repeated TYPE item;
// }
// }
//
// yields list<item: list<item: TYPE not null> not null> ?nullable
if (!list_group.logical_type()->is_list()) {
return Status::Invalid("Group with one repeated child must be LIST-annotated.");
}
// LIST-annotated group with three-level encoding cannot be repeated.
if (repeated_field->is_group() &&
!static_cast<const GroupNode&>(*repeated_field).field(0)->is_repeated()) {
return Status::Invalid("LIST-annotated groups must not be repeated.");
}
RETURN_NOT_OK(
NodeToSchemaField(*repeated_field, current_levels, ctx, out, child_field));
} else if (HasListElementName(list_group, group)) {
// We distinguish the special case that we have
//
// required/optional group name=SOMETHING {
// repeated group name=array or $SOMETHING_tuple {
// required/optional TYPE item;
// }
// }
//
// The inner type of the list should be a struct rather than a primitive value
//
// yields list<item: struct<item: TYPE ?nullable> not null> ?nullable
RETURN_NOT_OK(GroupToStruct(list_group, current_levels, ctx, out, child_field));
} else {
// Resolve 3-level encoding
//
// required/optional group name=whatever {
// repeated group name=list {
// required/optional TYPE item;
// }
// }
//
// yields list<item: TYPE ?nullable> ?nullable
RETURN_NOT_OK(
NodeToSchemaField(*repeated_field, current_levels, ctx, out, child_field));
}
} else {
return Status::Invalid("Group must have at least one child.");
}
} else {
// Two-level list encoding
//
// required/optional group LIST {
// repeated TYPE;
// }
//
// TYPE is a primitive type
//
// yields list<item: TYPE not null> ?nullable
const auto& primitive_node = static_cast<const PrimitiveNode&>(list_node);
int column_index = ctx->schema->GetColumnIndex(primitive_node);
ASSIGN_OR_RAISE(std::shared_ptr<ArrowType> type,
Expand Down
Loading