Skip to content

Commit

Permalink
Addressed concerns from code review. Moved large string test into a t…
Browse files Browse the repository at this point in the history
…est that tested a few strings with very large values (instead of the existing test which tests many many small values) as this is the situation that actually triggers the error.
  • Loading branch information
westonpace committed Jul 27, 2023
1 parent 85eadc5 commit 5e38b00
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 12 deletions.
74 changes: 66 additions & 8 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include "arrow/array/builder_dict.h"
#include "arrow/array/builder_nested.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/array/util.h"
#include "arrow/chunked_array.h"
#include "arrow/compute/api.h"
#include "arrow/io/api.h"
Expand Down Expand Up @@ -4095,6 +4094,63 @@ TEST(TestArrowReaderAdHoc, LARGE_MEMORY_TEST(LargeStringColumn)) {

ASSERT_OK(batched_table->ValidateFull());
AssertTablesEqual(*table, *batched_table, /*same_chunk_layout=*/false);
}

TEST(TestArrowReaderAdHoc, LARGE_MEMORY_TEST(LargeStringValue)) {
// ARROW-3762
::arrow::StringBuilder builder;
// 16 rows of 256MiB bytes each is 4GiB. This will get put into
// 3 chunks of 7 rows, 7 rows, and 2 rows.
constexpr std::int32_t kValueSize = 1 << 28;
constexpr std::int32_t kNumRows = 4;
constexpr std::int32_t kNumChunks = 4;
std::vector<std::shared_ptr<Array>> chunks;
std::vector<uint8_t> value(kValueSize, '0');
for (int chunk_idx = 0; chunk_idx < kNumChunks; chunk_idx++) {
ASSERT_OK(builder.Resize(kNumRows));
ASSERT_OK(builder.ReserveData(kNumRows * kValueSize));
for (int64_t i = 0; i < kNumRows; ++i) {
builder.UnsafeAppend(value.data(), kValueSize);
}
std::shared_ptr<Array> array;
ASSERT_OK(builder.Finish(&array));
chunks.push_back(std::move(array));
}

// Eaglerly free up memory
value.clear();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<ChunkedArray> chunk,
ChunkedArray::Make(std::move(chunks)));

auto table = Table::Make(::arrow::schema({::arrow::field("x", ::arrow::utf8())}),
{std::move(chunk)});
std::shared_ptr<SchemaDescriptor> schm;
ASSERT_OK_NO_THROW(
ToParquetSchema(table->schema().get(), *default_writer_properties(), &schm));

auto sink = CreateOutputStream();

auto schm_node = std::static_pointer_cast<GroupNode>(
GroupNode::Make("schema", Repetition::REQUIRED, {schm->group_node()->field(0)}));

auto writer = ParquetFileWriter::Open(sink, schm_node);

std::unique_ptr<FileWriter> arrow_writer;
ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(), std::move(writer),
table->schema(), default_arrow_writer_properties(),
&arrow_writer));
ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table, table->num_rows()));
ASSERT_OK_NO_THROW(arrow_writer->Close());

ASSERT_OK_AND_ASSIGN(auto tables_buffer, sink->Finish());

// drop to save memory
arrow_writer.reset();
sink.reset();

auto reader = ParquetFileReader::Open(std::make_shared<BufferReader>(tables_buffer));
std::unique_ptr<FileReader> arrow_reader;
ASSERT_OK(FileReader::Make(default_memory_pool(), std::move(reader), &arrow_reader));

// Test ReadRecordBatchesAsync
auto batch_gen = arrow_reader->ReadRowGroupAsync(
Expand All @@ -4109,13 +4165,15 @@ TEST(TestArrowReaderAdHoc, LARGE_MEMORY_TEST(LargeStringColumn)) {
/*allow_sliced_batches=*/true);

std::vector<std::shared_ptr<RecordBatch>> batches;
fut = batch_gen();
ASSERT_OK_AND_ASSIGN(auto batch, fut.result());
batches.push_back(batch);
fut = batch_gen();
ASSERT_OK_AND_ASSIGN(batch, fut.result());
batches.push_back(batch);
ASSERT_OK_AND_ASSIGN(batched_table, ::arrow::Table::FromRecordBatches(batches));
while (true) {
fut = batch_gen();
ASSERT_OK_AND_ASSIGN(auto batch, fut.result());
if (batch == nullptr) {
break;
}
batches.push_back(batch);
}
ASSERT_OK_AND_ASSIGN(auto batched_table, ::arrow::Table::FromRecordBatches(batches));
ASSERT_OK(batched_table->ValidateFull());
AssertTablesEqual(*table, *batched_table, /*same_chunk_layout=*/false);
}
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,11 @@ class FileReaderImpl : public FileReader {
::arrow::internal::Executor* cpu_executor, bool allow_sliced_batches);

AsyncBatchGenerator ReadRowGroupsAsync(const std::vector<int>& row_groups,
const std::vector<int>& indices,
const std::vector<int>& column_indices,
::arrow::internal::Executor* cpu_executor,
bool allow_sliced_batches) override {
Result<AsyncBatchGenerator> batch_gen =
DoReadRowGroupsAsync(row_groups, indices, cpu_executor, allow_sliced_batches);
Result<AsyncBatchGenerator> batch_gen = DoReadRowGroupsAsync(
row_groups, column_indices, cpu_executor, allow_sliced_batches);
if (batch_gen.ok()) {
return batch_gen.MoveValueUnsafe();
}
Expand Down Expand Up @@ -1319,7 +1319,8 @@ class AsyncBatchGeneratorImpl {
ARROW_RETURN_NOT_OK(
column_reader->NextBatch(rows_in_batch, &chunked_array));
return chunked_array;
});
},
state_->cpu_executor);

// Grab the first batch of data and return it. If there is more than one batch then
// throw the reamining batches into overflow and they will be fetched on the next call
Expand Down

0 comments on commit 5e38b00

Please sign in to comment.