Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NaN counter to Metrics and implement in Parquet writers #1641

Merged
merged 9 commits into from
Nov 12, 2020

Conversation

yyanyy
Copy link
Contributor

@yyanyy yyanyy commented Oct 22, 2020

This change adds NaN counter in Metrics model, and update it during Parquet writing. I believe it only touches internal models and will not write the new attribute to output files. This change is the first step towards implementing spec change defined in #348 .

Questions:

  • As mentioned in a comment I highlighted, SparkTableUtil (essentially importSparkTable()) (link) reads metrics from Parquet footer directly, and thus won't populate NaN counts. If we don't want to accept this as a fact, we may need to switch ParquetFileReader.readFooter() to use internal parquet reader, and enable metric collection during reading, but this could be much more expensive than the current approach. Do people have better suggestions?
  • The current change doesn't help with removing NaN from lower/upper bound, since parquet library doesn't treat NaN for its min/max stats specially. I'm thinking to use the same approach to populate upper and lower bounds, and wondering if people have better suggestions.

Wanted to submit a draft to gather comments on the approach in general. Will add more tests later.

Comment on lines 75 to 77
public static Metrics fileMetrics(InputFile file, MetricsConfig metricsConfig, NameMapping nameMapping) {
try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(file))) {
return footerMetrics(reader.getFooter(), metricsConfig, nameMapping);
return footerMetrics(reader.getFooter(), Stream.empty(), null, metricsConfig, nameMapping);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the similar problem I mentioned in pr description for importing spark table; if the file itself is directly passed in there's not much chance to get the additional metrics tracked by value writers. Currently fileMetrics are only used by tests. Do people have suggestions on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, we should just set the NaN count map to null. I don't think that we want to scan imported files to create these metrics. Also, I believe that we can rely on recent Parquet versions to not produce min or max values that are NaN, so it should be safe to use these as long as we check that they are not NaN.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I'll leave this as Stream.empty for now.

Regarding relying on recent Parquet versions to not produce min or max values that are NaN, sounds like that also answers my question in the pr description (i.e. we won't follow this approach to populate upper and lower bounds). In my current code base it looks like parquet still gives us NaN as max; do you happen to have a reference to the parquet version that supports NaN properly? From a quick search I wasn't able to find it; I noticed this but I suspect it's for something else. I'll look into it more deeply if you don't have it handy.

so it should be safe to use these as long as we check that they are not NaN.

Sorry to make sure I understand this correctly, sounds like only the three following cases will be valid:

  1. v1 table, no NaN counter, min/max could have NaN - use the existing logic, we can't do much about min/max==NaN
  2. v2 table, NaN counter exists, min/max will not be NaN - in this case metrics are produced by iceberg writer
  3. v2 table, no NaN counter, min/max will not be NaN - in this case the file is imported or from this fileMetrics

Then to accommodate for (3) we will have to remember in evaluators that absence of NaN counter doesn't necessarily mean there's no NaN value in the column; but that might be fine since we will need this logic to accommodate (1) as well (unless we implement evaluators in a way that we can differentiate v1/2 table; not sure if we want that).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a reference for the Parquet fix. I think I was in a sync where it was discussed. Maybe we should generate our own lower/upper bounds for Parquet then.

Float.compare will sort NaN values last, so if we do get max=NaN from Parquet our evaluators should still work as expected. It will just include a much larger range than necessary. If we can generate better stats for table metadata, then that would be ideal.

Your cases look correct, except that I would say that we expect only max=NaN or min=max=NaN from Parquet. Using the existing logic should be okay.

I agree that the lack of a NaN counter means that the value is unknown. This is the case for all files written to v1 tables. V2 tables will generally have the NaN counter values, but not in all cases (imported files).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I guess I'll revisit min/max status for Parquet after NaN related code are mostly done.

@@ -146,8 +146,13 @@ public UTF8String getUTF8String(int ordinal) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change to this class was mostly trying to use it in TestSparkParquetMergingMetrics. Currently this class is only being used for reading rows for metadata tables (in RowDataReader, and I think only the metadata table will produce DataTask).

In the tests I wanted to convert Record to InternalRow for testing Spark appender. I was debating if I should expand this class beyond its current usage, or to write a new converter. Here are the two things I need to change to (partially) implement the former:

  1. null handling which results to the change in get(): RowDataReader doesn't call get() directly (it uses Dyn reflections for reading individual attributes and skip nulls) for converting into other Spark internal row representation (UnsafeRow in this case), thus we didn't see issue. However, when spark parquet writer uses get(), without this change NPE will occur. Note that even after this change, other use cases of this class (e.g. getUTF8String() are still not null safe, and I wonder if people have opinion on if we want a full null-safe support update to all methods in this class.
  2. for getBinary() change, currently we convert Fixed type to binary for Spark (link), and the method I used for creating random records generate fixed type with byte[] (link), and before this change getBinary() doesn't work for byte[] implementation of fixed type. Alternatively we could wrap fixed type in random record generator the same way we do for binary type. I decide to do the former to allow binary related types to be more flexible when wrapping them in this class, but I guess this comes back to the question to if we want to evolve this class or to create a separate converter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For #1, I'm fine adding the null check since this isn't used in a high performance code path, but it would be better to have the code that uses this call isNullAt directly because that is the contract for Spark's InternalRow.

For #2, since getBinary is called for both fixed and binary types, I think we do need to check the type of the object. I'd much rather do that using instanceof rather than catching an exception. Can you update it to use struct.get(ordinal, Object.class) and then check the type of the object returned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the information!

For 1, thanks for the info! I wasn't aware of the contract.

I wasn't sure if we want to add isNullAt for this specific case though, as I guess the problem comes from a difference in behavior between InternalRow and StructInternalRow, and adding isNullAt might have a larger performance penalty in production.

The behavior difference comes from the ability of calling internalRow.get() that could return null. The NPE eventually comes from struct.get(index, types[index]) in SparkParquetWriters. While the actual InternalRow could return null for null column, for StructInternalRow it assumes the underlying get() returns non-null, and sometimes performs some actions to them that could lead to NPE. e.g. call toString for converting, or return as primitive type directly. Thus SparkParquetWriters was able to call get fine under normal circumstances, but it couldn't for the specific usage of StructInternalRow in this test.

For 2, that's a better idea, will do this instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1, let's just add the check. This isn't used in a performance-critical path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me.

@@ -28,4 +30,7 @@
List<TripleWriter<?>> columns();

void setColumnStore(ColumnWriteStore columnStore);

Stream<FieldMetrics> metrics();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation needed. And is the name a bit too generic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add documentations. For the name, I think ParquetValueWriter already implies that it's field specific, so I guess metrics itself would convey the idea that it's field specific metrics. Do you have better suggestions?

assertBounds(1, IntegerType.get(), null, null, metrics);
}

@Test
public void testMetricsForNaNColumns() throws IOException {
Copy link
Contributor

@rdblue rdblue Nov 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few cases to consider with NaN values because comparison with NaN is always false.

Here are a couple of implementations that have issues because they use comparison without checking for NaN:

// max is NaN for values [NaN, 1.0, 1.1]
Float max = null;
for (value : values) {
  if (max == null || max < value) {
    max = value;
  }
}

// max is NaN if any value is NaN
Float max = null;
for (value : values) {
  max = (max != null && max >= value) ? max : value;
}

Because the failure cases are different, I think we should test a few different cases:

  • A column starts with NaN
  • A column contains NaN in the middle
  • A column ends with NaN

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out! I guess this tests more about boundary than of NaN count, but it would be very helpful when we start to exclude NaN from upper/lower bounds. Will update.


return fieldMetrics
.filter(metrics -> {
String alias = inputSchema.idToAlias(metrics.getId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alias isn't what you want to use here. An alias is the file schema's name when an Iceberg schema is converted from a file schema. Parquet and Avro don't allow special characters in field names, or a column's name may have changed after a file is written. In both cases, a file schema's names won't match the schema. The alias map exposes the original file field names for when we need to use them (e.g., get a page reader for a column from the file).

In this case, we want to use the table schema's name, not a file schema's name. That's why we use findColumnName above. That should work here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the explanation!


@Override
public Stream<FieldMetrics> metrics() {
if (id != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should always produce the metric. Field IDs are required to write, so we are guaranteed that they are always present (or should fail if one is not). And as long as this is always gathering the metric, we may as well return it.

}
}

private static class DoubleWriter extends UnboxedWriter<Double> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments from the float case above.

* It shouldn't be used in production; {@link org.apache.iceberg.parquet.ParquetWriter} is a better alternative.
* @deprecated use {@link org.apache.iceberg.parquet.ParquetWriter}
*/
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@yyanyy yyanyy marked this pull request as ready for review November 5, 2020 02:10
@yyanyy
Copy link
Contributor Author

yyanyy commented Nov 5, 2020

Thank you for all the comments! I think I have addressed all the feedback, and rebased the change. I have also removed the draft tag from the pr.

@yyanyy yyanyy requested a review from rdblue November 5, 2020 02:15

public abstract InputFile writeRecords(Schema schema, Record... records) throws IOException;
public abstract Metrics getMetrics(Schema schema, Record... records) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refactor seems to have introduced a lot of changes. Is it needed? Seems like it may just introduce conflicts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main reason for refactoring is that, before this change we have writeRecords to create appender and return InputFile, and then in Parquet specific tests we use ParquetUtil.fileMetrics to read metrics from the file footer of InputFile directly. But now since NaN is tracked during writing, to test NaN we will need to test against appender.metrics().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! Looks like we need to keep it then.

@rdblue
Copy link
Contributor

rdblue commented Nov 11, 2020

@yyanyy, thank you for the update! Nothing major to fix. Overall the changes look good, but I found a couple of minor things that might reduce the size of this PR. Thanks!

// behaviors differ due to their implementation of comparison being different.
if (fileFormat() == FileFormat.ORC) {
assertBounds(1, FloatType.get(), Float.NaN, Float.NaN, metrics);
assertBounds(2, DoubleType.get(), Double.NaN, Double.NaN, metrics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NaN as an upper bound should be safe, but NaN as a lower bound may not be. Does this mean we need to fix our evaluators to check for NaN?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@omalley and @shardulm94, FYI. Looks like we are getting unexpected bounds for some ORC cases with NaN.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this means that today, we may skip including an ORC file for predicates that utilize bounds when the column to be evaluated contains non-NaN data but both upper and lower bound is NaN, which happens when the field of the first record in the file is NaN. Is my understanding correct? I can create an issue about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, an issue with a test case would be great! Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to do that yesterday: #1761

@rdblue
Copy link
Contributor

rdblue commented Nov 12, 2020

Thanks for the update, @yyanyy! I'll merge this.

@rdblue rdblue merged commit 944a437 into apache:master Nov 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants