Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Support Page Skipping in Iceberg Parquet Reader #1566

Closed
wants to merge 2 commits into from

Conversation

shangxinli
Copy link
Contributor

Issue link: #193

@rdblue
Copy link
Contributor

rdblue commented Oct 12, 2020

Thanks for opening this PR, @shangxinli! I'll take a look at this soon. It's great to see you have it working.

@rdblue
Copy link
Contributor

rdblue commented Oct 12, 2020

FYI @aokolnychyi, @prodeezy: this PR adds support for page-level skipping in Parquet.

@@ -582,6 +582,13 @@ public ReadBuilder withNameMapping(NameMapping newNameMapping) {
optionsBuilder = ParquetReadOptions.builder();
}

if (filter != null &&
schema.getAliases() != null &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the rationale behind schema.getAliases() != null check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Iceberg table schema will probably rarely have aliases. I think after my recommendation at #1566 (comment) you should no longer see the NPE and this check can be removed.

I think this is also in alignment with what @rdblue says at #193 (comment)

When converting to the Parquet FilterApi, the aliases must come from a Schema instance that was created from the MessageType of the file you're pushing filters to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense

schema.getAliases() != null &&
ParquetFilters.isSupportedFilter(filter, schema, caseSensitive)) {
optionsBuilder.useRecordFilter(filterRecords);
optionsBuilder.withRecordFilter(ParquetFilters.convert(schema, filter, caseSensitive));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the first argument to ParquetFilters.convert should be the file schema and not the table schema. Since Iceberg allows column renaming, a column in the table schema maybe named differently in the file. If you look at

String path = schema.idToAlias(ref.fieldId());
it will use field ids to get the corresponding column name from the provided schema (file). You can also see this pattern at
type = schemaReader.getFileMetaData().getSchema();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you recommend to move "type = schemaReader.getFileMetaData().getSchema()" to the beginning of this method so that we can use the file schema instead of table schema? I worry it could result in more storage API calls because when the condition "if (readerFunc != null || batchedReaderFunc != null)" is true, we don't have this API call today.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to move it to the absolute top, we can only fetch the file schema if the filter is supposed to be used. It will lead to an extra open call, but I am not sure I see a good way around it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we call it when we need to convert.

Operation.LT,
Operation.LT_EQ);

private static final Set<Type.TypeID> SUPPORTED_TYPES = ImmutableSet.of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why other types like TIMESTAMP, STRING and other binary columns are not supported? Seems like ParquetFilters is able to convert expressions for these types too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I overlooked at UNSUPPORTED_TYPES defined in ExpressToSearchArgument.java. It is for ORC instead of Parquet. I will remove these.

private ParquetFilters() {
}

public static boolean isSupportedFilter(Expression expr, Schema schema, boolean caseSensitive) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than introducing more code to check if the filter is supported, can we just eagerly try to convert filter and use the exception as a signal that the filter is unsupported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what I originally use but I changed it because throwing/catching exception is pretty expensive. In the non-error case, we should avoid that, although I understand creating some separate code to keep tracking is another pain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha! Since this is not in the critical loop of reading rows, I think the cost of catching exception should not be significant in the grand scheme of things. I don't have a strong opinion for this though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We throw it once per file in case of unsupported filters. I don't have strong opinions either.

long blockRowCount = blocks.get(nextRowGroup).getRowCount();
Preconditions.checkState(blockRowCount >= pages.getRowCount(),
"Number of values in the block, %s, does not great or equal number of values after filtering, %s",
blockRowCount, pages.getRowCount());
long rowPosition = rowGroupsStartRowPos[nextRowGroup];
Copy link
Contributor

@shardulm94 shardulm94 Oct 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if the nextRowGroup counter and consequently rowPosition will be accurate in case of skipped pages. Having the correct rowPosition relative to the start of the file is critical for applying row-level deletes. Can you add a test to TestSparkParquetReadMetadataColumns which would invoke page skipping?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading more about Parquet and page skipping, I don't think my comment above holds true in the general case. Seems like nextRowGroup counter should still accurately depict the current row group being processed. The only edge case that I see is in https://github.com/apache/parquet-mr/blob/fe9ab5d22c602776f44b027ad3cc05609eddba54/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L967 where a whole row group may be skipped if the column index allows all the pages in the row group to be skipped. In such a case, readNextFilteredRowGroup will automatically advance to the next block without our knowledge and hence without the counter being updated. This will be an issue for all further row group skipping and row postition calculations in Iceberg's ParquetReader. However, I am not sure if there can be such a case where Iceberg's row group skipping is not able to skip the row group, but Parquet's page skipping ends put skipping all page and consequently the whole row group.

Copy link
Contributor

@shardulm94 shardulm94 Oct 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This edge case is probably possible with a carefully crafted file. e.g. consider the min/max values of single column
Row Group: [1, 1000]
Pages: [[1, 10], [900, 1000]]
And expression column = 500 will match the row group but will not match any of the pages resulting in the next row group being read automatically without the nextRowGroup counter being updated

Copy link
Contributor Author

@shangxinli shangxinli Oct 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is such an important comment! That is absolutely possible and we need to handle that case. I looked at the code again and found even the existing code, we are missing a condition check 'pages == null'. It seems valid that readNextRowGroup() can return null https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L899. In that case, the caller needs to handle while it doesn't. It is more complicated for readNextFilteredRowGroup() because it can advance internally without we know. I will add some handling code there. If you have suggestions, let me know.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shardulm94, after looking into the code deeper, I think there could be two solutions here. 1) Change Parquet to let readNextFilteredGroup() return not only the filtered pages but also the record counts it skipped. 2) In the ReadConf constructor, when deciding 'shouldRead', we add ColumnIndex filter. This will ensure if a row group is called by readNextFilteredGroup(), there will always be at least 1 page returned and no implicitly advance internally without Iceberg knowing it. Here is the place for determining the 'shouldRead' boolean variable. https://github.com/apache/iceberg/blob/master/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java#L106

For option 1, we will need to wait for the new release of Parquet. I am not sure releasing 1.10.2 is an option for Parquet. If not, we have to wait for Parquet 1.12.0. For option 2, it requires a pretty significant change. And we have to make sure the implementation here is consistent with Parquet' ColumnIndex filter implementation.

Please share your thought and we can discuss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shardulm94, I talked to the Parquet community(PARQUET-1927) and it seems we don't need Parquet change anymore. What we can do is to use ParquetFileReader constructor's filter(page level stats, row group stats, dictionray and future bloomfilter). All the following calls like reader.getRowGroups() and reader.getFilteredRecordCount() in ReadConf are all filtered values. It also simplifies the ReadConf constructor and ParquetReader/VectorizedParquetReader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shangxinli After going through the code, I don't think this solves the problem. ParquetFileReader.getRowGroups does not apply column index filters (only stats and dict). So there is still the case where getRowGroups will return a block which will be skipped by readNextFilteredGroup(). I think the current test case does not accurately capture this.

Overall I don't think page skipping plays nicely with Iceberg's requirement to accurately fetch row numbers for row level deletes. e.g Iceberg currently stores the row number of the start of each row group, but in case of page skipping, some pages in the middle of the row group may be skipped and we would have no way to figure out the accurate row numbers. So we may have to disable page skipping when we want to fetch row numbers.

cc: @rdblue
Having thought about this issue deeply in the Parquet context, I have doubts whether our ORC implementation for row numbers also suffers from the same issue. I will have to revisit this for ORC.

@shardulm94
Copy link
Contributor

I guess

also needs to be changed to use valuesRead + skippedValues just like hasNext()

int totalCount = getPageRecordCount(parquetFile, null);
int filterCount = getPageRecordCount(parquetFile,
Expressions.and(Expressions.notNull("intCol"), Expressions.equal("intCol", 1)));
Assert.assertTrue(filterCount < totalCount);
Copy link
Contributor

@shardulm94 shardulm94 Oct 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this end up reading one and only one page? Can make the assertion more strict?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I changed to test code to compare with one single page count.

@prodeezy
Copy link
Contributor

Nice work @shangxinli! Do you plan to add any jmh benchmark tests to illustrate the filtering benefits?

@shangxinli
Copy link
Contributor Author

I am new to Iceberg. Can you add more details about 'jmh'? My plan is to have unit tests along with this PR. I can then open another PR if benchmarking tests and other tests are needed.

Nice work @shangxinli! Do you plan to add any jmh benchmark tests to illustrate the filtering benefits?

@prodeezy
Copy link
Contributor

I am new to Iceberg. Can you add more details about 'jmh'? My plan is to have unit tests along with this PR. I can then open another PR if benchmarking tests and other tests are needed.

JMH is a toolkit for microbenchmarks in Java. There are some tests here in Iceberg that use it:
https://github.com/apache/iceberg/tree/c9ccdfef4fd7798cd824047cf4e8077c31c1c9e1/spark2/src/jmh/java/org/apache/iceberg/spark/data/parquet

@shangxinli
Copy link
Contributor Author

@shardulm94 Do you have time to have another look?

@shardulm94
Copy link
Contributor

@shangxinli Yes, I started reviewing this yesterday, plan to complete it today. Just relying on Parquet to do filtering (stats/dictionary/page skipping/bloom filters) will greatly simplify Iceberg code, but I am not sure why the initial implementation chose to apply filters ourselves. @rdblue do you have any insight here?
This will also disable pushdown of some filters like IN, NOT_IN and STARTS_WITH. So it may still be a good idea to keep the Iceberg side row group pruning logic.
Another thing to consider is the parity between Icebergs filters and Parquet filters with respect to null semantics, like we found out for ORC in #1536

@shangxinli
Copy link
Contributor Author

shangxinli commented Nov 3, 2020

@shardulm94 I am also curious about why Iceberg reimplements those filters. @rdblue Can you cast some light on this?

I can see the pros and cons of reimplementing. The concern for reimplementing is that it creates duplication and fragments. For short-term workaround it is fine, but for the long run, I think deduping to one implementation makes more sense. Otherwise, when new filters are added in Parquet, we need to reimplement them here. Column index is one, and bloom filer is another one on the way, and so on. If some filters in Iceberg but not in Parquet, we can bring in to the Parquet community to add them. What do you think?

@rdblue
Copy link
Contributor

rdblue commented Nov 3, 2020

There are a few reasons why Iceberg reimplemented the filters.

  • Faster iteration and more features without needing Parquet releases
  • Parquet's filter API has some problems
    • Evaluation is negated (canDrop vs shouldRead), which has led to more bugs
    • It is missing some predicates that we need to be well supported, like startsWith, in, alwaysTrue, and alwaysFalse
    • The API is very difficult to work with
  • Iceberg replaces record materialization, so we would need to run these filters from Iceberg code anyway
  • Iceberg had already implemented similar filters, like stats evaluation, so it was simple to reuse that code

To fix some of the issues with the Parquet API, my hope was that eventually Parquet would use Iceberg's expression API and filters in place of its own. We'd need to refactor a bit to make this happen, but I think it would still be a good option. There are several things that I think would be great to standardize across some of the storage projects like the FileIO classes and the expressions.

@rdblue
Copy link
Contributor

rdblue commented Nov 3, 2020

One more thing, we also wanted to use Iceberg's comparators. I think we had those done before Parquet introduced its comparators, and Iceberg also has strong support for CharSequence without conversion to String.

@@ -623,17 +630,10 @@ public ReadBuilder withNameMapping(NameMapping newNameMapping) {
if (filter != null) {
// TODO: should not need to get the schema to push down before opening the file.
// Parquet should allow setting a filter inside its read support
MessageType type;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment above sounds like another possible reason why we wanted to reimplement filters. Tailoring the filter to each file is difficult, compared to evaluating the same filter for a file.

If I remember correctly, the need to tailor the filter for the file is because we use id-based column resolution. So the file might contain 1: a int and Iceberg has a filter for 1: x long. Unless the filter is translated to use a instead of x, Parquet will skip the file because it doesn't think the column exists (and is all nulls).

if (filter != null) {
statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR removes support for pushing down filters like startsWith and in?

@shangxinli
Copy link
Contributor Author

@rdblue, make sense. Now the question is for ColumnIndex filter do we also want to reimplement? And the same question for future bloomfilter. Since we already had other types of filter reimplemented, it makes more sense to do the same thing. It is just more effort to do, but I can do it if Iceberg decides to reimplement all the filters in Parquet.

If we don't reimplement, we might need Parquet to release 1.11.2(we discussed in the last Parquet meeting and that is possible if needed) with a fix to have skipped record count. But we still have the same issue you mentioned above(IN, STARTWITH not supported etc).

@shardulm94, I didn't address your comments yet. If we decide to reimplement the ColumnIndex filter, then we cannot call readNextRowGroupFilter(). Instead, we will filter pages(maybe in BaseColumnIterator?, need to look into it more). So I will address later once we have agreement here.

@rdblue
Copy link
Contributor

rdblue commented Nov 4, 2020

@shangxinli, I think we probably want to build the filtering into Iceberg. I could be convinced otherwise, but it would take a lot of work to use Parquet filters.

After my comment yesterday, I was looking at the implementation here and noticed another problem: Parquet expects a filter that references columns using the name that was written into each data file. Iceberg allows renaming and so the Iceberg filters are written to use field IDs instead of names. That is needed for correctness. If I rename field a to x, then I have to rewrite the filter x > 5 to a > 5 when reading an old data file, or else Parquet thinks there are no matching records because column x is not present and is all nulls.

Parquet also doesn't support customizing the filter in ReadSupport, so to do this you have to read each file's footer to get its schema, rewrite the filter, and then open the file again to read it.

I'm remembering more of the issues with the original filter path now, and I think that it makes more sense to do the work in Iceberg for now, with the intent to get the work upstream in a 2.0 Parquet API that fixes some of these issues.

What do you think?

@shangxinli
Copy link
Contributor Author

@rdblue I think it makes sense.

@shangxinli
Copy link
Contributor Author

@rdblue I looked at the implementation of ColumnIndex. Here are what we need to do to rewrite this filter and use it in Iceberg.

  1. Rewrite the equivalent filters in the Parquet ColumnIndexFilter class.
  2. Rewrite ParquetFileReader#getRowRanges(), which applies the column index filters and get rowRanges (RowRanges).
  3. Use the filtered rowRanges gotten in step 2 to build a PageReadStore like ColumnChunkPageReadStore https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L976. Unfortunately ColumnChunkPageReadStore() is not a public class for Iceberg to use. This would require us to rewrite our own PageReadStore, or let Parquet make it public.
  4. With the filtered rowRanges to get filteredOffsetIndex, we can read all the parts needed with it. Then we need SynchronizingColumnReader to synchronize cross columns. Unfortunately, SynchronizingColumnReader is not a public class either. We need to rewrite it in Iceberg.
  5. Wrap up the step Support dateCreated expressions in ScanSummary. #2, Store multiple partition specs in table metadata. #3, Use the same shared FileSystem instance across calls in HadoopTableOperations #4 into a method to replace readNextFilteredRowGroup() in Parquet.

The work seems more than just rewriting the filter but also rewriting ColumnChunkPageReadStore, SynchronizingColumnReader, and partially ParquetFileReader. This makes me think should we continue rewriting or reusing? If we choose to reuse, we can limit the filter not to support startWith, in.., or we can add them into Parquet. Let me know you thought.

@iflytek-hmwang5
Copy link

@shangxinli Hi, any updates on it?

Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jul 27, 2024
Copy link

github-actions bot commented Aug 3, 2024

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Aug 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants