Skip to content

Commit

Permalink
Encode the file path from Iceberg when converting to a PartitionedFil…
Browse files Browse the repository at this point in the history
…e [databricks] (#9717)

* Encode the file path from Iceberg when converting to a PartitionedFile

Because Iceberg always gives the raw data path but rapids multi-file readers
expect the file path is encoded and url safe.

Signed-off-by: Firestarman <[email protected]>

* Address comments

Signed-off-by: Firestarman <[email protected]>

---------

Signed-off-by: Firestarman <[email protected]>
  • Loading branch information
firestarman authored Nov 15, 2023
1 parent e4fdd84 commit 90789ba
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 9 deletions.
22 changes: 22 additions & 0 deletions integration_tests/src/main/python/iceberg_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,3 +571,25 @@ def setup_iceberg_table(spark):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT *, input_file_name() FROM {}".format(table)),
conf={'spark.rapids.sql.format.parquet.reader.type': reader_type})


@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_parquet_read_from_url_encoded_path(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmp_view = spark_tmp_table_factory.get()
partition_gen = StringGen(pattern="(.|\n){1,10}", nullable=False)\
.with_special_case('%29%3EtkiudF4%3C', 1000)\
.with_special_case('%2F%23_v9kRtI%27', 1000)\
.with_special_case('aK%2BAgI%21l8%3E', 1000)\
.with_special_case('p%2Cmtx%3FCXMd', 1000)
def setup_iceberg_table(spark):
df = two_col_df(spark, long_gen, partition_gen).sortWithinPartitions('b')
df.createOrReplaceTempView(tmp_view)
spark.sql("CREATE TABLE {} USING ICEBERG PARTITIONED BY (b) AS SELECT * FROM {}"
.format(table, tmp_view))
with_cpu_session(setup_iceberg_table)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql("SELECT * FROM {}".format(table)),
conf={'spark.rapids.sql.format.parquet.reader.type': reader_type})
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class GpuMultiFileBatchReader extends BaseDataReader<ColumnarBatch> {
this.nameMapping = NameMappingParser.fromJson(nameMapping);
}
files = Maps.newLinkedHashMapWithExpectedSize(task.files().size());
task.files().forEach(fst -> this.files.putIfAbsent(fst.file().path().toString(), fst));
task.files().forEach(fst -> this.files.putIfAbsent(toEncodedPathString(fst), fst));
}

@Override
Expand Down Expand Up @@ -190,6 +190,15 @@ private Schema requiredSchema(GpuDeleteFilter deleteFilter) {
}
}

/**
* MultiFiles readers expect the path string is encoded and url safe.
* Here leverages this conversion to do this encoding because Iceberg
* gives the raw data path by `file().path()` call.
*/
private String toEncodedPathString(FileScanTask fst) {
return new Path(fst.file().path().toString()).toUri().toString();
}

static class FilteredParquetFileInfo {
private final ParquetFileInfoWithBlockMeta parquetBlockMeta;
private final Map<Integer, ?> idToConstant;
Expand Down Expand Up @@ -254,7 +263,7 @@ protected MultiFileBatchReaderBase() {
final InternalRow emptyPartValue = InternalRow.empty();
PartitionedFile[] pFiles = files.values().stream()
.map(fst -> PartitionedFileUtilsShim.newPartitionedFile(emptyPartValue,
fst.file().path().toString(), fst.start(), fst.length()))
toEncodedPathString(fst), fst.start(), fst.length()))
.toArray(PartitionedFile[]::new);
rapidsReader = createRapidsReader(pFiles, emptyPartSchema);
}
Expand All @@ -277,7 +286,8 @@ protected abstract FilePartitionReaderBase createRapidsReader(PartitionedFile[]
StructType partitionSchema);

/** The filter function for the Parquet multi-file reader */
protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst) {
protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst,
String partFilePathString) {
GpuDeleteFilter deleteFilter = deleteFilter(fst);
if (deleteFilter != null) {
throw new UnsupportedOperationException("Delete filter is not supported");
Expand Down Expand Up @@ -309,7 +319,9 @@ protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst) {
GpuParquetReader.addNullsForMissingFields(idToConstant, reorder.getMissingFields());

ParquetFileInfoWithBlockMeta parquetBlockMeta = ParquetFileInfoWithBlockMeta.apply(
new Path(new URI(fst.file().path().toString())), clippedBlocks,
// The path conversion aligns with that in Rapids multi-files readers.
// So here should use the file path of a PartitionedFile.
new Path(new URI(partFilePathString)), clippedBlocks,
InternalRow.empty(), fileReadSchema, partReaderSparkSchema,
DateTimeRebaseCorrected$.MODULE$, // dateRebaseMode
DateTimeRebaseCorrected$.MODULE$, // timestampRebaseMode
Expand Down Expand Up @@ -354,9 +366,10 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles,
}

private ParquetFileInfoWithBlockMeta filterParquetBlocks(PartitionedFile file) {
FileScanTask fst = files.get(file.filePath());
FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst);
constsSchemaMap.put(file.filePath().toString(),
String partFilePathString = file.filePath().toString();
FileScanTask fst = files.get(partFilePathString);
FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst, partFilePathString);
constsSchemaMap.put(partFilePathString,
Tuple2.apply(filteredInfo.idToConstant(), filteredInfo.expectedSchema()));
return filteredInfo.parquetBlockMeta();
}
Expand Down Expand Up @@ -388,8 +401,10 @@ class ParquetCoalescingBatchReader extends MultiFileBatchReaderBase {
protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles,
StructType partitionSchema) {
ArrayList<ParquetSingleDataBlockMeta> clippedBlocks = new ArrayList<>();
files.values().forEach(fst -> {
FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst);
Arrays.stream(pFiles).forEach(pFile -> {
String partFilePathString = pFile.filePath().toString();
FileScanTask fst = files.get(partFilePathString);
FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst, partFilePathString);
List<ParquetSingleDataBlockMeta> fileSingleMetas =
JavaConverters.asJavaCollection(filteredInfo.parquetBlockMeta.blocks()).stream()
.map(b -> ParquetSingleDataBlockMeta.apply(
Expand Down

0 comments on commit 90789ba

Please sign in to comment.