Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-32566: [C++] Connect parquet to the new scan node #35889

Closed
wants to merge 13 commits into from

Conversation

westonpace
Copy link
Member

@westonpace westonpace commented Jun 2, 2023

Rationale for this change

This ended up being considerably more change than just connecting parquet to the new scan node. In order to do this I had to refactor the scan node itself somewhat. It introduces the concept of scan tasks (or maybe scan streams would be a better name) to help clarify the concept of a row group (which I didn't have to worry about with CSV). I also introduced the staging area which is a slightly different approach to sequencing that I think will be much simpler.

What changes are included in this PR?

The new scan node now supports the parquet format.

Are these changes tested?

Yes

Are there any user-facing changes?

There are breaking changes to the scan2 node but this feature hasn't really been released yet.

@westonpace
Copy link
Member Author

This is very much still a draft. There are still a lot of tests to add and some TODOs (column projection and row filtering) but I don't expect the overall structure to change too much if anyone wanted to take an early look.

@westonpace
Copy link
Member Author

On the bright side, I can now reach max parallelism with about 3GB of RAM, regardless of the size of row groups (and performance looks to be about 10% better but still very early to say that)

… considerably. Now each fragment contains one or more scan tasks. Each scan task can yield a stream of batches. So, CSV, for example, is a single scan task that covers the entire file. Parquet, on the other hand, has a scan task per row group. This also makes explicit a lot of the logic that was implicit around sequencing and trying to figure out the correct batch index.
Copy link
Member

@mapleFU mapleFU left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this would prevent from use_thread deadlock?

@@ -16,10 +16,12 @@
// under the License.

#include "parquet/arrow/reader.h"
#include <sys/types.h>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we real need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, thank you for noticing. I will clean this up soon.

}
if (!first) {
// TODO(weston): Test this case
return Status::Invalid("Unexpected empty row group");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I guess a RowGroup can be empty currently. You can easily generate a case like this using python write_table

table.len() == 10000
write_table(table, 2000)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this create a table with 5 row groups? Why would this be an empty row group?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  Status WriteTable(const Table& table, int64_t chunk_size) override {
    RETURN_NOT_OK(table.Validate());

    if (chunk_size <= 0 && table.num_rows() > 0) {
      return Status::Invalid("chunk size per row_group must be greater than 0");
    } else if (!table.schema()->Equals(*schema_, false)) {
      return Status::Invalid("table schema does not match this writer's. table:'",
                             table.schema()->ToString(), "' this:'", schema_->ToString(),
                             "'");
    } else if (chunk_size > this->properties().max_row_group_length()) {
      chunk_size = this->properties().max_row_group_length();
    }

    auto WriteRowGroup = [&](int64_t offset, int64_t size) {
      RETURN_NOT_OK(NewRowGroup(size));
      for (int i = 0; i < table.num_columns(); i++) {
        RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size));
      }
      return Status::OK();
    };

    if (table.num_rows() == 0) {
      // Append a row group with 0 rows
      RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close()));
      return Status::OK();
    }

    for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
      int64_t offset = chunk * chunk_size;
      RETURN_NOT_OK_ELSE(
          WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)),
          PARQUET_IGNORE_NOT_OK(Close()));
    }
    return Status::OK();
  }

It's from this code. It's easy to flush the rowgroup that row_num == 0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I manually created some empty row groups. It turns this branch should be unreachable because, further up, we will have noticed that "remaining rows" is 0 and returned an end marker. I've updated this code and added a test case for this scenario in #36779

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, great!

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels Jul 18, 2023
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Jul 18, 2023
@westonpace
Copy link
Member Author

So, this would prevent from use_thread deadlock?

Yes. Since this method is async then the caller can choose not to block. Previously we used GetRecordBatchGenerator which is very similar to the method I ended up creating (ReadRowGroupsAsync). The difference is that GetRecordBatchGenerator ignores batch_size and ReadRowGroupsAsync does not. My hope is that, eventually, GetRecordBatchGenerator can be deprecated and removed.

@westonpace westonpace marked this pull request as ready for review July 18, 2023 23:16
@westonpace westonpace requested a review from wgtmac as a code owner July 18, 2023 23:16
@westonpace
Copy link
Member Author

I think the changes here are probably too extensive to expect review. I will be breaking this PR up into multiple PRs.

  • Adding a new ReadRowGroupsAsync to the file reader
  • Reworking the scan node
  • Adding parquet support

@westonpace westonpace marked this pull request as draft July 18, 2023 23:18
@westonpace westonpace removed the request for review from wgtmac July 18, 2023 23:18
@westonpace
Copy link
Member Author

westonpace commented Jul 19, 2023

@westonpace
Copy link
Member Author

The second PR is now available. Once the two pre-reqs merge I will undraft this.

@westonpace westonpace closed this Apr 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[C++] Create fragment scanners for csv/parquet/orc/ipc
2 participants