Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-36778: [C++][Parquet] Add ReadRowGroupAsync/ReadRowGroupsAsync #36779

Closed
217 changes: 217 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@
#include "arrow/testing/random.h"
#include "arrow/testing/util.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/config.h" // for ARROW_CSV definition
#include "arrow/util/decimal.h"
#include "arrow/util/future.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/range.h"
#include "arrow/util/thread_pool.h"

#ifdef ARROW_CSV
#include "arrow/csv/api.h"
Expand Down Expand Up @@ -2514,6 +2516,137 @@ TEST(TestArrowReadWrite, GetRecordBatchGenerator) {
}
}

TEST(TestArrowReadWrite, ReadRowGroupsAsync) {
constexpr int kNumRows = 1024;
constexpr int kRowGroupSize = 512;
constexpr int kNumColumns = 2;

std::shared_ptr<Table> table;
ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(kNumColumns, kNumRows, 1, &table));

std::shared_ptr<Buffer> buffer;
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, kRowGroupSize,
default_arrow_writer_properties(), &buffer));

for (std::vector<int> row_groups : std::vector<std::vector<int>>{{}, {0}, {0, 1}}) {
ARROW_SCOPED_TRACE("# row_groups = ", row_groups.size());
int32_t expected_total_rows = static_cast<int32_t>(row_groups.size()) * kRowGroupSize;

for (std::vector<int> columns : std::vector<std::vector<int>>{{}, {0}, {0, 1}}) {
ARROW_SCOPED_TRACE("# columns = ", columns.size());

for (int row_group_size : {128, 512, 1024, 2048}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: row_group_size -> batch_size

ARROW_SCOPED_TRACE("row_group_size = ", row_group_size);

ArrowReaderProperties properties = default_arrow_reader_properties();
properties.set_batch_size(row_group_size);
std::unique_ptr<FileReader> unique_reader;
FileReaderBuilder builder;
ASSERT_OK(builder.Open(std::make_shared<BufferReader>(buffer)));
ASSERT_OK(builder.properties(properties)->Build(&unique_reader));
auto batch_generator = unique_reader->ReadRowGroupsAsync(
row_groups, columns, ::arrow::internal::GetCpuThreadPool());

int64_t num_expected_batches =
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
::arrow::bit_util::CeilDiv(expected_total_rows, row_group_size);
std::vector<std::shared_ptr<::arrow::RecordBatch>> batches;
int32_t remaining = expected_total_rows;
for (int64_t i = 0; i < num_expected_batches; i++) {
int32_t expected_num_rows = std::min(remaining, row_group_size);
remaining -= expected_num_rows;
auto batch_fut = batch_generator();
ASSERT_OK_AND_ASSIGN(auto batch, batch_fut.result());

ASSERT_NE(batch, nullptr);
ASSERT_EQ(batch->num_columns(), static_cast<int>(columns.size()));
ASSERT_EQ(batch->num_rows(), expected_num_rows);

batches.push_back(batch);
}
auto terminal_fut = batch_generator();
ASSERT_OK_AND_ASSIGN(auto terminal_batch, terminal_fut.result());
ASSERT_EQ(terminal_batch, nullptr);

std::shared_ptr<::arrow::Table> expected = table->Slice(0, expected_total_rows);
ASSERT_OK_AND_ASSIGN(expected, expected->SelectColumns(columns));

ASSERT_OK_AND_ASSIGN(auto actual, ::arrow::Table::FromRecordBatches(
expected->schema(), std::move(batches)));
AssertTablesEqual(*expected, *actual, /*same_chunk_layout=*/false);
}
}
}
}

TEST(TestArrowReadWrite, ReadRowGroupsAsyncEmptyRowGroup) {
// This test case explores what happens when an empty row group is encountered. We
// create a sample parquet file with two row groups. The first is empty and the second
// has a single row
constexpr int kNumColumns = 2;

std::vector<std::shared_ptr<Field>> fields;
for (int i = 0; i < kNumColumns; i++) {
fields.push_back(::arrow::field("", ::arrow::float64()));
}
std::shared_ptr<Schema> test_schema = ::arrow::schema(fields);

auto sink = CreateOutputStream();
auto write_props = WriterProperties::Builder().build();
std::unique_ptr<FileWriter> writer;
ASSERT_OK_AND_ASSIGN(writer, FileWriter::Open(*test_schema, default_memory_pool(), sink,
std::move(write_props),
default_arrow_writer_properties()));
ASSERT_OK(writer->NewRowGroup(0));
for (int i = 0; i < kNumColumns; i++) {
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> empty_array,
::arrow::MakeEmptyArray(::arrow::float64()));
ASSERT_OK(writer->WriteColumnChunk(*empty_array));
}
ASSERT_OK(writer->NewRowGroup(1));
for (int i = 0; i < kNumColumns; i++) {
std::shared_ptr<Array> non_empty_array = ArrayFromJSON(::arrow::float64(), "[1]");
ASSERT_OK(writer->WriteColumnChunk(*non_empty_array));
}
ASSERT_OK(writer->Close());
ASSERT_OK_AND_ASSIGN(std::shared_ptr<Buffer> buffer, sink->Finish());

ArrowReaderProperties properties = default_arrow_reader_properties();
std::unique_ptr<FileReader> unique_reader;
FileReaderBuilder builder;
ASSERT_OK(builder.Open(std::make_shared<BufferReader>(buffer)));
ASSERT_OK(builder.properties(properties)->Build(&unique_reader));

auto AssertEnded = [&](FileReader::AsyncBatchGenerator* gen) {
auto terminal_fut = (*gen)();
ASSERT_OK_AND_ASSIGN(auto terminal_batch, terminal_fut.result());
ASSERT_EQ(terminal_batch, nullptr);
};

auto AssertNonEmptyBatch = [&](FileReader::AsyncBatchGenerator* gen) {
auto batch_fut = (*gen)();
ASSERT_OK_AND_ASSIGN(auto batch, batch_fut.result());

ASSERT_NE(batch, nullptr);
ASSERT_EQ(batch->num_columns(), kNumColumns);
ASSERT_GT(batch->num_rows(), 0);
};

// If the empty row group is the only row group asked for then the generator will end
// immediately without emitting a single batch
auto batch_generator = unique_reader->ReadRowGroupsAsync(
/*row_groups=*/{0}, /*columns=*/{0, 1}, ::arrow::internal::GetCpuThreadPool());

AssertEnded(&batch_generator);

// If the empty row group is not the only row group asked for then it will simply be
// skipped
batch_generator = unique_reader->ReadRowGroupsAsync(
/*row_groups=*/{0, 1}, /*columns=*/{0, 1}, ::arrow::internal::GetCpuThreadPool());

AssertNonEmptyBatch(&batch_generator);
AssertEnded(&batch_generator);
}

TEST(TestArrowReadWrite, ScanContents) {
const int num_columns = 20;
const int num_rows = 1000;
Expand Down Expand Up @@ -3963,6 +4096,90 @@ TEST(TestArrowReaderAdHoc, LARGE_MEMORY_TEST(LargeStringColumn)) {
AssertTablesEqual(*table, *batched_table, /*same_chunk_layout=*/false);
}

TEST(TestArrowReaderAdHoc, LARGE_MEMORY_TEST(LargeStringValue)) {
// ARROW-3762
::arrow::StringBuilder builder;
// 16 rows of 256MiB bytes each is 4GiB. This will get put into
// 3 chunks of 7 rows, 7 rows, and 2 rows.
constexpr std::int32_t kValueSize = 1 << 28;
constexpr std::int32_t kNumRows = 4;
constexpr std::int32_t kNumChunks = 4;
std::vector<std::shared_ptr<Array>> chunks;
{
// `value` is large and created with its own scope so it
// can be freed when as soon as we are done using it.
std::vector<uint8_t> value(kValueSize, '0');
ASSERT_OK(builder.Resize(kNumRows));
ASSERT_OK(builder.ReserveData(kNumRows * kValueSize));
for (int64_t i = 0; i < kNumRows; ++i) {
builder.UnsafeAppend(value.data(), kValueSize);
}
std::shared_ptr<Array> array;
ASSERT_OK(builder.Finish(&array));
westonpace marked this conversation as resolved.
Show resolved Hide resolved
for (int chunk_idx = 0; chunk_idx < kNumChunks; chunk_idx++) {
chunks.push_back(array);
}
}

ASSERT_OK_AND_ASSIGN(std::shared_ptr<ChunkedArray> chunk,
ChunkedArray::Make(std::move(chunks)));

auto table = Table::Make(::arrow::schema({::arrow::field("x", ::arrow::utf8())}),
{std::move(chunk)});
std::shared_ptr<SchemaDescriptor> schm;
ASSERT_OK_NO_THROW(
ToParquetSchema(table->schema().get(), *default_writer_properties(), &schm));

auto sink = CreateOutputStream();

auto schm_node = std::static_pointer_cast<GroupNode>(
GroupNode::Make("schema", Repetition::REQUIRED, {schm->group_node()->field(0)}));

auto writer = ParquetFileWriter::Open(sink, schm_node);

std::unique_ptr<FileWriter> arrow_writer;
ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(), std::move(writer),
table->schema(), default_arrow_writer_properties(),
&arrow_writer));
ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table, table->num_rows()));
ASSERT_OK_NO_THROW(arrow_writer->Close());

ASSERT_OK_AND_ASSIGN(auto tables_buffer, sink->Finish());

// drop to save memory
arrow_writer.reset();
sink.reset();

auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(tables_buffer));
std::unique_ptr<FileReader> arrow_reader;
ASSERT_OK(FileReader::Make(default_memory_pool(), std::move(reader), &arrow_reader));

// Test ReadRecordBatchesAsync
auto batch_gen = arrow_reader->ReadRowGroupsAsync(
{0}, /*executor=*/::arrow::internal::GetCpuThreadPool(),
/*allow_sliced_batches=*/false);

auto fut = batch_gen();
ASSERT_RAISES(Invalid, fut.result());

batch_gen = arrow_reader->ReadRowGroupsAsync(
{0}, /*executor=*/::arrow::internal::GetCpuThreadPool(),
/*allow_sliced_batches=*/true);

std::vector<std::shared_ptr<RecordBatch>> batches;
while (true) {
fut = batch_gen();
ASSERT_OK_AND_ASSIGN(auto batch, fut.result());
if (batch == nullptr) {
break;
}
batches.push_back(batch);
}
ASSERT_OK_AND_ASSIGN(auto batched_table, ::arrow::Table::FromRecordBatches(batches));
ASSERT_OK(batched_table->ValidateFull());
AssertTablesEqual(*table, *batched_table, /*same_chunk_layout=*/false);
}

TEST(TestArrowReaderAdHoc, HandleDictPageOffsetZero) {
#ifndef ARROW_WITH_SNAPPY
GTEST_SKIP() << "Test requires Snappy compression";
Expand Down
Loading
Loading