Skip to content

Commit

Permalink
Parquet: Support reading INT96 column in row group filter (apache#8988)
Browse files Browse the repository at this point in the history
  • Loading branch information
manuzhang authored and adnanhemani committed Jan 30, 2024
1 parent 3281053 commit 35d2884
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.function.Function;
Expand Down Expand Up @@ -112,6 +113,10 @@ static Function<Object, Object> converterFromParquet(PrimitiveType type) {
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
return binary -> ByteBuffer.wrap(((Binary) binary).getBytes());
case INT96:
return binary ->
ParquetUtil.extractTimestampInt96(
ByteBuffer.wrap(((Binary) binary).getBytes()).order(ByteOrder.LITTLE_ENDIAN));
default:
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ private <T> Set<T> dict(int id, Comparator<T> comparator) {
case DOUBLE:
dictSet.add((T) conversion.apply(dict.decodeToDouble(i)));
break;
case INT96:
dictSet.add((T) conversion.apply(dict.decodeToBinary(i)));
break;
default:
throw new IllegalArgumentException(
"Cannot decode dictionary of type: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2156,22 +2156,32 @@ public void testTableWithInt96Timestamp() throws IOException {
stagingLocation);

// validate we get the expected results back
List<Row> expected = spark.table("parquet_table").select("tmp_col").collectAsList();
List<Row> actual =
spark
.read()
.format("iceberg")
.load(loadLocation(tableIdentifier))
.select("tmp_col")
.collectAsList();
Assertions.assertThat(actual)
.as("Rows must match")
.containsExactlyInAnyOrderElementsOf(expected);
testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
dropTable(tableIdentifier);
}
}
}

private void testWithFilter(String filterExpr, TableIdentifier tableIdentifier) {
List<Row> expected =
spark.table("parquet_table").select("tmp_col").filter(filterExpr).collectAsList();
List<Row> actual =
spark
.read()
.format("iceberg")
.load(loadLocation(tableIdentifier))
.select("tmp_col")
.filter(filterExpr)
.collectAsList();
Assertions.assertThat(actual)
.as("Rows must match")
.containsExactlyInAnyOrderElementsOf(expected);
}

private GenericData.Record manifestRecord(
Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) {
GenericRecordBuilder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2154,22 +2154,32 @@ public void testTableWithInt96Timestamp() throws IOException {
stagingLocation);

// validate we get the expected results back
List<Row> expected = spark.table("parquet_table").select("tmp_col").collectAsList();
List<Row> actual =
spark
.read()
.format("iceberg")
.load(loadLocation(tableIdentifier))
.select("tmp_col")
.collectAsList();
Assertions.assertThat(actual)
.as("Rows must match")
.containsExactlyInAnyOrderElementsOf(expected);
testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
dropTable(tableIdentifier);
}
}
}

private void testWithFilter(String filterExpr, TableIdentifier tableIdentifier) {
List<Row> expected =
spark.table("parquet_table").select("tmp_col").filter(filterExpr).collectAsList();
List<Row> actual =
spark
.read()
.format("iceberg")
.load(loadLocation(tableIdentifier))
.select("tmp_col")
.filter(filterExpr)
.collectAsList();
Assertions.assertThat(actual)
.as("Rows must match")
.containsExactlyInAnyOrderElementsOf(expected);
}

private GenericData.Record manifestRecord(
Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) {
GenericRecordBuilder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2181,20 +2181,30 @@ public void testTableWithInt96Timestamp() throws IOException {
stagingLocation);

// validate we get the expected results back
List<Row> expected = spark.table("parquet_table").select("tmp_col").collectAsList();
List<Row> actual =
spark
.read()
.format("iceberg")
.load(loadLocation(tableIdentifier))
.select("tmp_col")
.collectAsList();
assertThat(actual).as("Rows must match").containsExactlyInAnyOrderElementsOf(expected);
testWithFilter("tmp_col < to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
testWithFilter("tmp_col <= to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
testWithFilter("tmp_col == to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
testWithFilter("tmp_col > to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
testWithFilter("tmp_col >= to_timestamp('2000-01-31 08:30:00')", tableIdentifier);
dropTable(tableIdentifier);
}
}
}

private void testWithFilter(String filterExpr, TableIdentifier tableIdentifier) {
List<Row> expected =
spark.table("parquet_table").select("tmp_col").filter(filterExpr).collectAsList();
List<Row> actual =
spark
.read()
.format("iceberg")
.load(loadLocation(tableIdentifier))
.select("tmp_col")
.filter(filterExpr)
.collectAsList();
assertThat(actual).as("Rows must match").containsExactlyInAnyOrderElementsOf(expected);
}

private GenericData.Record manifestRecord(
Table manifestTable, Long referenceSnapshotId, ManifestFile manifest) {
GenericRecordBuilder builder =
Expand Down

0 comments on commit 35d2884

Please sign in to comment.