-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-36778: [C++][Parquet] Add ReadRowGroupAsync/ReadRowGroupsAsync #36779
GH-36778: [C++][Parquet] Add ReadRowGroupAsync/ReadRowGroupsAsync #36779
Conversation
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest is ok for me
Is the logic for concat multiple batches from RowGroup implemented? Or it will be implement in the future? |
cpp/src/parquet/arrow/reader.cc
Outdated
// do provide a batch size but even for a small batch size it is possible that a | ||
// column has extremely large strings which don't fit in a single batch. | ||
Future<std::vector<std::shared_ptr<ChunkedArray>>> chunked_arrays_fut = | ||
::arrow::internal::OptionalParallelForAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cpu_executor
is not used here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch, here it uses internal::GetCpuThreadPool()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@westonpace Do you want to fix this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thank you. I have now fixed this in 5e38b00
I think the ColumnReader already supports this internally. |
Oh I see, |
7625dab
to
5e38b00
Compare
cpp/src/parquet/arrow/reader.h
Outdated
/// \param allow_sliced_batches if false, an error is raised if a batch has too much | ||
/// data for the given batch size. If true, smaller | ||
/// batches will be returned instead. | ||
virtual AsyncBatchGenerator ReadRowGroupAsync(int i, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to expose ReadRowGroupAsync
in addition to ReadRowGroupsAsync
? One is a trivial call to the other...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was attempting to maintain parity with the synchronous methods above. I only need one of these four methods and so if you'd prefer I'm happy to scope this down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least the single-row group ReadRowGroupAsync
is a trivial redirect to the several-row groups variant, so removing those two would be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I've removed the single-row variants.
/// \param row_groups indices of the row groups to read | ||
/// \param cpu_executor an executor to use to run CPU tasks | ||
/// \param allow_sliced_batches if false, an error is raised if a batch has too much | ||
/// data for the given batch size. If true, smaller |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what "a batch has too much data for the given batch size" means exactly. Do you mean "a row group has too much data for the given batch size"?
Also, why is it false by default? It seems allowing it should be the default behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have strong feelings on this default. I will switch it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have switched the default to true
/// | ||
/// Note: When reading multiple row groups there is no guarantee you will get one | ||
/// record batch per row group. Data from multiple row groups could get combined into | ||
/// a single batch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, and I agree it's probably desirable. Is it a deviation from other APIs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. This is the same way that the synchronous APIs behave.
cpp/src/parquet/arrow/reader.cc
Outdated
std::shared_ptr<ChunkedArray> chunked_array; | ||
ARROW_RETURN_NOT_OK( | ||
column_reader->NextBatch(rows_in_batch, &chunked_array)); | ||
return chunked_array; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary for this PR, but we'd probably like a Result
returning variant of NextBatch
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a result-returning variant.
"The setting allow_sliced_batches is set to false and data was " | ||
"encountered that was too large to fit in a single batch."); | ||
} | ||
state->overflow.push(std::move(next_batch)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this makes the generator not async-reentrant, since operator()
might be called from one thread while this callback runs on another thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I've added some lines to the method doc to make this explicit. There isn't much point in trying to make this async-reentrant since that would require parallel reading of a row group and we don't support that. It might, in theory, be possible, but I think most users get enough parallelism from multiple files / multiple row groups.
} | ||
|
||
// Eaglerly free up memory | ||
value.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clear
unfortunately leaves the vector capacity unchanged. Perhaps value = {}
would work...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put value
in its own scope block.
Co-authored-by: mwish <[email protected]> Co-authored-by: Gang Wu <[email protected]>
…est that tested a few strings with very large values (instead of the existing test which tests many many small values) as this is the situation that actually triggers the error.
5e38b00
to
7d78ee9
Compare
…the CPU thread pool. Updated to use the I/O thread pool for I/O
@wgtmac Would you like to review this again? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some minor comments.
@@ -249,6 +250,63 @@ class PARQUET_EXPORT FileReader { | |||
virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups, | |||
std::shared_ptr<::arrow::Table>* out) = 0; | |||
|
|||
using AsyncBatchGenerator = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cpp/src/arrow/util/async_generator_fwd.h has defined AsyncGenerator below, should we reuse it?
template <typename T>
using AsyncGenerator = std::function<Future<T>()>;
/// \param allow_sliced_batches if false, an error is raised if a batch has too much | ||
/// data for the given batch size. If false, smaller | ||
/// batches will be returned instead. | ||
virtual AsyncBatchGenerator ReadRowGroupsAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not return arrow::Result, so the docstring would be good to include expected return value, especially when allow_sliced_batches
is false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would Future
in arrow actually include Status
, so it would contain the arrow::Result
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add something here.
Would Future in arrow actually include Status?
Yes. A Future in Arrow has an implicit Status. So it is generally invalid to see something like Future<Result<T>>
. There are a few places where we do return Result<Future<T>>
. This typically indicates that "something can fail quickly, in the synchronous part" (e.g. the thread pool is already shut down) and any failure in the asynchronous portion will be communicated in the Future
. However, I generally still prefer to just return Future<T>
in these cases (there is a utility DeferNotOk
which will convert Result<Future<T>>
to Future<T>
)
@@ -316,6 +374,14 @@ class PARQUET_EXPORT ColumnReader { | |||
// the data available in the file. | |||
virtual ::arrow::Status NextBatch(int64_t batch_size, | |||
std::shared_ptr<::arrow::ChunkedArray>* out) = 0; | |||
|
|||
// Overload of NextBatch that returns a Result | |||
virtual ::arrow::Result<std::shared_ptr<::arrow::ChunkedArray>> NextBatch( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it required to expose this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove it.
for (std::vector<int> columns : std::vector<std::vector<int>>{{}, {0}, {0, 1}}) { | ||
ARROW_SCOPED_TRACE("# columns = ", columns.size()); | ||
|
||
for (int row_group_size : {128, 512, 1024, 2048}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: row_group_size -> batch_size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM!
/// \param allow_sliced_batches if false, an error is raised if a batch has too much | ||
/// data for the given batch size. If false, smaller | ||
/// batches will be returned instead. | ||
virtual AsyncBatchGenerator ReadRowGroupsAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would Future
in arrow actually include Status
, so it would contain the arrow::Result
?
int64_t batch_size, ::arrow::internal::Executor* io_executor, | ||
::arrow::internal::Executor* cpu_executor) final { | ||
Future<> load_fut = ::arrow::DeferNotOk( | ||
io_executor->Submit([this, batch_size] { return LoadBatch(batch_size); })); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it matter that io_executor
would consume some CPU?
LeafReader::LoadBatch
might decompress page, decode records, parse def-rep levels. They're all cpu-intensive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good observation. In a perfect world we would do all of that on a CPU thread. This would help to keep context switches to a minimum. The only work that would happen on the I/O thread would be the RandomAccessFile
call to read the file. However, that requires pushing async further into the parquet code base which would be a lot of work. It's not clear that the benefit would be significant enough to require the work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I see it in comment, just saying that it might harm. Perfect IO might be so tricky and need to wrap RangeCache
and RandomAccessFile
. This looks good to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another point is that seems that LoadBatch
would only depends on sync-io api? So it would not cause deadlock when LoadBatch
waiting for page io? (which could happen when use_thread
with previous scanner?)
std::shared_ptr<ChunkedArray> out; | ||
RETURN_NOT_OK(BuildArray(batch_size, &out)); | ||
for (int x = 0; x < out->num_chunks(); x++) { | ||
RETURN_NOT_OK(out->chunk(x)->Validate()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(this is not related to this patch, but I want to ask, why it's neccessary to Validate it here?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm only guessing as I didn't write the original implementation but my guess is that, for security reasons, it is almost always required to validate because a malicious user could otherwise craft a parquet file that triggers buffer overflow. For example, they could store a list array where one of the offsets is way out of range.
break; | ||
} | ||
if (first) { | ||
if (!state->allow_sliced_batches) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add this in document?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this parameter (allow_sliced_batches
) is documented. Am I misunderstanding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, that's my fault. The document looks good to me
Co-authored-by: Gang Wu <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
auto generator_state = std::make_shared<AsyncBatchGeneratorState>(); | ||
generator_state->io_executor = reader_properties_.io_context().executor(); | ||
generator_state->cpu_executor = cpu_executor; | ||
generator_state->use_threads = reader_properties_.use_threads(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Final question, it's not related to correctness, but it's a bit confusing, I've seen the comment:
/// This method ignores the use_threads property of the ArrowReaderProperties. It will
/// always create a task for each column. To run without threads you should use a
/// serial executor as the CPU executor.
So why is use_threads
introduced here?
Rationale for this change
The rationale is described in #36778
What changes are included in this PR?
New methods are added to
parquet::arrow::FileReader
which read the file asynchronously and respect the batch size property. In addition, these new methods are a bit simpler thanGetRecordBatchGenerator
as they are able to reuse a lot of the code in the synchronous methods.Are these changes tested?
Yes, I've added new unit tests.
Are there any user-facing changes?
Yes, there are new methods available. There should be no breaking changes to any existing methods.