Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encode the file path from Iceberg when converting to a PartitionedFile [databricks] #9717

Merged
merged 2 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions integration_tests/src/main/python/iceberg_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,3 +571,25 @@ def setup_iceberg_table(spark):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql("SELECT *, input_file_name() FROM {}".format(table)),
conf={'spark.rapids.sql.format.parquet.reader.type': reader_type})


@iceberg
@ignore_order(local=True) # Iceberg plans with a thread pool and is not deterministic in file ordering
@pytest.mark.parametrize('reader_type', rapids_reader_types)
def test_iceberg_parquet_read_from_url_encoded_path(spark_tmp_table_factory, reader_type):
table = spark_tmp_table_factory.get()
tmp_view = spark_tmp_table_factory.get()
partition_gen = StringGen(pattern="(.|\n){1,10}", nullable=False)\
.with_special_case('%29%3EtkiudF4%3C', 1000)\
.with_special_case('%2F%23_v9kRtI%27', 1000)\
.with_special_case('aK%2BAgI%21l8%3E', 1000)\
.with_special_case('p%2Cmtx%3FCXMd', 1000)
def setup_iceberg_table(spark):
df = two_col_df(spark, long_gen, partition_gen).sortWithinPartitions('b')
df.createOrReplaceTempView(tmp_view)
spark.sql("CREATE TABLE {} USING ICEBERG PARTITIONED BY (b) AS SELECT * FROM {}"
.format(table, tmp_view))
with_cpu_session(setup_iceberg_table)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql("SELECT * FROM {}".format(table)),
conf={'spark.rapids.sql.format.parquet.reader.type': reader_type})
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class GpuMultiFileBatchReader extends BaseDataReader<ColumnarBatch> {
this.nameMapping = NameMappingParser.fromJson(nameMapping);
}
files = Maps.newLinkedHashMapWithExpectedSize(task.files().size());
task.files().forEach(fst -> this.files.putIfAbsent(fst.file().path().toString(), fst));
task.files().forEach(fst -> this.files.putIfAbsent(toEncodedPathString(fst), fst));
}

@Override
Expand Down Expand Up @@ -190,6 +190,15 @@ private Schema requiredSchema(GpuDeleteFilter deleteFilter) {
}
}

/**
* MultiFiles readers expect the path string is encoded and url safe.
* Here leverages this conversion to do this encoding because Iceberg
* gives the raw data path by `file().path()` call.
*/
private String toEncodedPathString(FileScanTask fst) {
return new Path(fst.file().path().toString()).toUri().toString();
winningsix marked this conversation as resolved.
Show resolved Hide resolved
}

static class FilteredParquetFileInfo {
private final ParquetFileInfoWithBlockMeta parquetBlockMeta;
private final Map<Integer, ?> idToConstant;
Expand Down Expand Up @@ -254,7 +263,7 @@ protected MultiFileBatchReaderBase() {
final InternalRow emptyPartValue = InternalRow.empty();
PartitionedFile[] pFiles = files.values().stream()
.map(fst -> PartitionedFileUtilsShim.newPartitionedFile(emptyPartValue,
fst.file().path().toString(), fst.start(), fst.length()))
toEncodedPathString(fst), fst.start(), fst.length()))
.toArray(PartitionedFile[]::new);
rapidsReader = createRapidsReader(pFiles, emptyPartSchema);
}
Expand All @@ -277,7 +286,8 @@ protected abstract FilePartitionReaderBase createRapidsReader(PartitionedFile[]
StructType partitionSchema);

/** The filter function for the Parquet multi-file reader */
protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst) {
protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst,
String partFilePathString) {
GpuDeleteFilter deleteFilter = deleteFilter(fst);
if (deleteFilter != null) {
throw new UnsupportedOperationException("Delete filter is not supported");
Expand Down Expand Up @@ -309,7 +319,7 @@ protected FilteredParquetFileInfo filterParquetBlocks(FileScanTask fst) {
GpuParquetReader.addNullsForMissingFields(idToConstant, reorder.getMissingFields());

ParquetFileInfoWithBlockMeta parquetBlockMeta = ParquetFileInfoWithBlockMeta.apply(
new Path(new URI(fst.file().path().toString())), clippedBlocks,
new Path(new URI(partFilePathString)), clippedBlocks,
InternalRow.empty(), fileReadSchema, partReaderSparkSchema,
DateTimeRebaseCorrected$.MODULE$, // dateRebaseMode
DateTimeRebaseCorrected$.MODULE$, // timestampRebaseMode
Expand Down Expand Up @@ -354,9 +364,10 @@ protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles,
}

private ParquetFileInfoWithBlockMeta filterParquetBlocks(PartitionedFile file) {
FileScanTask fst = files.get(file.filePath());
FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst);
constsSchemaMap.put(file.filePath().toString(),
String partFilePathString = file.filePath().toString();
FileScanTask fst = files.get(partFilePathString);
FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst, partFilePathString);
constsSchemaMap.put(partFilePathString,
Tuple2.apply(filteredInfo.idToConstant(), filteredInfo.expectedSchema()));
return filteredInfo.parquetBlockMeta();
}
Expand Down Expand Up @@ -388,8 +399,10 @@ class ParquetCoalescingBatchReader extends MultiFileBatchReaderBase {
protected FilePartitionReaderBase createRapidsReader(PartitionedFile[] pFiles,
StructType partitionSchema) {
ArrayList<ParquetSingleDataBlockMeta> clippedBlocks = new ArrayList<>();
files.values().forEach(fst -> {
FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst);
Arrays.stream(pFiles).forEach(pFile -> {
String partFilePathString = pFile.filePath().toString();
FileScanTask fst = files.get(partFilePathString);
FilteredParquetFileInfo filteredInfo = filterParquetBlocks(fst, partFilePathString);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's add some comments why we need to pass in partFilePathString other than using fst.file().path().toString() to get it.

#9697 (comment)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use toEncodedPathString(fst) instead of this partFilePathString here, but I don't want to do this encoding again since we already have the encoded path string in the PartitionedFile.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

List<ParquetSingleDataBlockMeta> fileSingleMetas =
JavaConverters.asJavaCollection(filteredInfo.parquetBlockMeta.blocks()).stream()
.map(b -> ParquetSingleDataBlockMeta.apply(
Expand Down