diff --git a/integration_tests/src/main/python/iceberg_test.py b/integration_tests/src/main/python/iceberg_test.py index f073ad51489..3b3f83c6deb 100644 --- a/integration_tests/src/main/python/iceberg_test.py +++ b/integration_tests/src/main/python/iceberg_test.py @@ -571,3 +571,25 @@ def setup_iceberg_table(spark): assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.sql("SELECT *, input_file_name() FROM {}".format(table)), conf={'spark.rapids.sql.format.parquet.reader.type': reader_type}) + + +@iceberg +@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering +@pytest.mark.parametrize('reader_type', rapids_reader_types) +def test_iceberg_parquet_read_from_url_encoded_path(spark_tmp_table_factory, reader_type): + table = spark_tmp_table_factory.get() + tmp_view = spark_tmp_table_factory.get() + partition_gen = StringGen(pattern="(.|\n){1,10}", nullable=False)\ + .with_special_case('%29%3EtkiudF4%3C', 1000)\ + .with_special_case('%2F%23_v9kRtI%27', 1000)\ + .with_special_case('aK%2BAgI%21l8%3E', 1000)\ + .with_special_case('p%2Cmtx%3FCXMd', 1000) + def setup_iceberg_table(spark): + df = two_col_df(spark, long_gen, partition_gen).sortWithinPartitions('b') + df.createOrReplaceTempView(tmp_view) + spark.sql("CREATE TABLE {} USING ICEBERG PARTITIONED BY (b) AS SELECT * FROM {}" + .format(table, tmp_view)) + with_cpu_session(setup_iceberg_table) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.sql("SELECT * FROM {}".format(table)), + conf={'spark.rapids.sql.format.parquet.reader.type': reader_type}) diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java index 0fb8867d27a..a339043c5de 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuMultiFileBatchReader.java @@ -114,7 +114,7 @@ class GpuMultiFileBatchReader extends BaseDataReader { this.nameMapping = NameMappingParser.fromJson(nameMapping); } files = Maps.newLinkedHashMapWithExpectedSize(task.files().size()); - task.files().forEach(fst -> this.files.putIfAbsent(fst.file().path().toString(), fst)); + task.files().forEach(fst -> this.files.putIfAbsent(toEncodedPathString(fst), fst)); } @Override @@ -190,6 +190,15 @@ private Schema requiredSchema(GpuDeleteFilter deleteFilter) { } } + /** + * MultiFiles readers expect the path string is encoded and url safe. + * Here leverages this conversion to do this encoding because Iceberg + * gives the raw data path by `file().path()` call. + */ + private String toEncodedPathString(FileScanTask fst) { + return new Path(fst.file().path().toString()).toUri().toString(); + } + static class FilteredParquetFileInfo { private final ParquetFileInfoWithBlockMeta parquetBlockMeta; private final Map idToConstant; @@ -254,7 +263,7 @@ protected MultiFileBatchReaderBase() { final InternalRow emptyPartValue = InternalRow.empty(); PartitionedFile[] pFiles = files.values().stream() .map(fst -> PartitionedFileUtilsShim.newPartitionedFile(emptyPartValue, - fst.file().path().toString(), fst.start(), fst.length())) + toEncodedPathString(fst), fst.start(), fst.length())) .toArray(PartitionedFile[]::new); rapidsReader = createRapidsReader(pFiles, emptyPartSchema); } @@ -277,7 +286,8 @@ protected abstract FilePartitionReaderBase createRapidsReader(PartitionedFile[] StructType partitionSchema); /** The filter function for the Parquet multi-file reader */ - protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst) { + protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst, + String partFilePathString) { GpuDeleteFilter deleteFilter = deleteFilter(fst); if (deleteFilter != null) { throw new UnsupportedOperationException("Delete filter is not supported"); @@ -309,7 +319,9 @@ protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst) { GpuParquetReader.addNullsForMissingFields(idToConstant, reorder.getMissingFields()); ParquetFileInfoWithBlockMeta parquetBlockMeta = ParquetFileInfoWithBlockMeta.apply( - new Path(new URI(fst.file().path().toString())), clippedBlocks, + // The path conversion aligns with that in Rapids multi-files readers. + // So here should use the file path of a PartitionedFile. + new Path(new URI(partFilePathString)), clippedBlocks, InternalRow.empty(), fileReadSchema, partReaderSparkSchema, DateTimeRebaseCorrected$.MODULE$, // dateRebaseMode DateTimeRebaseCorrected$.MODULE$, // timestampRebaseMode @@ -354,9 +366,10 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles, } private ParquetFileInfoWithBlockMeta filterParquetBlocks(PartitionedFile file) { - FileScanTask fst = files.get(file.filePath()); - FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst); - constsSchemaMap.put(file.filePath().toString(), + String partFilePathString = file.filePath().toString(); + FileScanTask fst = files.get(partFilePathString); + FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst, partFilePathString); + constsSchemaMap.put(partFilePathString, Tuple2.apply(filteredInfo.idToConstant(), filteredInfo.expectedSchema())); return filteredInfo.parquetBlockMeta(); } @@ -388,8 +401,10 @@ class ParquetCoalescingBatchReader extends MultiFileBatchReaderBase { protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles, StructType partitionSchema) { ArrayList clippedBlocks = new ArrayList<>(); - files.values().forEach(fst -> { - FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst); + Arrays.stream(pFiles).forEach(pFile -> { + String partFilePathString = pFile.filePath().toString(); + FileScanTask fst = files.get(partFilePathString); + FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst, partFilePathString); List fileSingleMetas = JavaConverters.asJavaCollection(filteredInfo.parquetBlockMeta.blocks()).stream() .map(b -> ParquetSingleDataBlockMeta.apply(