Skip to content

Commit

Permalink
[HUDI-4729] Fix file group pending compaction cannot be queried when …
Browse files Browse the repository at this point in the history
…query _ro table (#6516)

File group in pending compaction can not be queried 
when query _ro table with spark. This commit fixes that.

Co-authored-by: zhanshaoxiong <shaoxiong0001@@gmail.com>
Co-authored-by: Sagar Sumit <[email protected]>
  • Loading branch information
3 people authored and yuzhaojing committed Sep 29, 2022
1 parent 8c7dca2 commit 218240e
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
* </ul>
*/
public abstract class BaseHoodieTableFileIndex implements AutoCloseable {

private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class);

private final String[] partitionColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ public Stream<FileSlice> getAllFileSlices() {
return Stream.empty();
}

public Stream<FileSlice> getAllFileSlicesBeforeOn(String maxInstantTime) {
return fileSlices.values().stream().filter(slice -> compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, maxInstantTime));
}

/**
* Gets the latest slice - this can contain either.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,18 +413,21 @@ protected boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
* base-files.
*
* @param fileSlice File Slice
* @param includeEmptyFileSlice include empty file-slice
*/
protected FileSlice filterBaseFileAfterPendingCompaction(FileSlice fileSlice) {
protected Stream<FileSlice> filterBaseFileAfterPendingCompaction(FileSlice fileSlice, boolean includeEmptyFileSlice) {
if (isFileSliceAfterPendingCompaction(fileSlice)) {
LOG.debug("File Slice (" + fileSlice + ") is in pending compaction");
// Base file is filtered out of the file-slice as the corresponding compaction
// instant not completed yet.
FileSlice transformed =
new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId());
FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId());
fileSlice.getLogFiles().forEach(transformed::addLogFile);
return transformed;
if (transformed.isEmpty() && !includeEmptyFileSlice) {
return Stream.of();
}
return Stream.of(transformed);
}
return fileSlice;
return Stream.of(fileSlice);
}

protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup fileGroup) {
Expand Down Expand Up @@ -606,9 +609,9 @@ public final Stream<FileSlice> getLatestFileSlices(String partitionStr) {
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
return fetchLatestFileSlices(partitionPath)
.filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
.map(this::filterBaseFileAfterPendingCompaction)
.map(this::addBootstrapBaseFileIfPresent);
.filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, true))
.map(this::addBootstrapBaseFileIfPresent);
} finally {
readLock.unlock();
}
Expand All @@ -627,7 +630,10 @@ public final Option<FileSlice> getLatestFileSlice(String partitionStr, String fi
return Option.empty();
} else {
Option<FileSlice> fs = fetchLatestFileSlice(partitionPath, fileId);
return fs.map(this::filterBaseFileAfterPendingCompaction).map(this::addBootstrapBaseFileIfPresent);
if (!fs.isPresent()) {
return Option.empty();
}
return Option.ofNullable(filterBaseFileAfterPendingCompaction(fs.get(), true).map(this::addBootstrapBaseFileIfPresent).findFirst().orElse(null));
}
} finally {
readLock.unlock();
Expand Down Expand Up @@ -665,13 +671,21 @@ public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionStr
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
Stream<FileSlice> fileSliceStream = fetchLatestFileSlicesBeforeOrOn(partitionPath, maxCommitTime)
.filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime));
Stream<Stream<FileSlice>> allFileSliceStream = fetchAllStoredFileGroups(partitionPath)
.filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
.map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
if (includeFileSlicesInPendingCompaction) {
return fileSliceStream.map(this::filterBaseFileAfterPendingCompaction).map(this::addBootstrapBaseFileIfPresent);
return allFileSliceStream.map(sliceStream -> sliceStream.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, false)))
.map(sliceStream -> Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
.map(this::addBootstrapBaseFileIfPresent);
} else {
return fileSliceStream.filter(fs -> !isPendingCompactionScheduledForFileId(fs.getFileGroupId()))
.map(this::addBootstrapBaseFileIfPresent);
return allFileSliceStream
.map(sliceStream ->
Option.fromJavaOptional(sliceStream
.filter(slice -> !isPendingCompactionScheduledForFileId(slice.getFileGroupId()))
.filter(slice -> !slice.isEmpty())
.findFirst()))
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
}
} finally {
readLock.unlock();
Expand Down Expand Up @@ -893,7 +907,6 @@ protected abstract Option<Pair<String, CompactionOperation>> getPendingCompactio
*/
abstract Stream<BootstrapBaseFileMapping> fetchBootstrapBaseFiles();


/**
* Checks if partition is pre-loaded and available in store.
*
Expand Down Expand Up @@ -967,7 +980,7 @@ Stream<FileSlice> fetchLatestFileSliceInRange(List<String> commitsToReturn) {
*/
Stream<FileSlice> fetchAllFileSlices(String partitionPath) {
return fetchAllStoredFileGroups(partitionPath).map(this::addBootstrapBaseFileIfPresent)
.map(HoodieFileGroup::getAllFileSlices).flatMap(sliceList -> sliceList);
.flatMap(HoodieFileGroup::getAllFileSlices);
}

/**
Expand Down Expand Up @@ -1003,8 +1016,7 @@ private Stream<HoodieBaseFile> fetchLatestBaseFiles() {
* @param partitionPath partition-path
*/
Stream<HoodieBaseFile> fetchAllBaseFiles(String partitionPath) {
return fetchAllStoredFileGroups(partitionPath).map(HoodieFileGroup::getAllBaseFiles)
.flatMap(baseFileList -> baseFileList);
return fetchAllStoredFileGroups(partitionPath).flatMap(HoodieFileGroup::getAllBaseFiles);
}

/**
Expand All @@ -1023,18 +1035,6 @@ Stream<FileSlice> fetchLatestFileSlices(String partitionPath) {
.map(Option::get);
}

/**
* Default implementation for fetching latest file-slices for a partition path as of instant.
*
* @param partitionPath Partition Path
* @param maxCommitTime Instant Time
*/
Stream<FileSlice> fetchLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime) {
return fetchAllStoredFileGroups(partitionPath)
.map(fileGroup -> fileGroup.getLatestFileSliceBeforeOrOn(maxCommitTime)).filter(Option::isPresent)
.map(Option::get);
}

/**
* Helper to merge last 2 file-slices. These 2 file-slices do not have compaction done yet.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi

class TestQueryMergeOnReadOptimizedTable extends HoodieSparkSqlTestBase {
test("Test Query Merge_On_Read Read_Optimized table") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| partitioned by (ts)
| location '$tablePath'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
// insert data to table
spark.sql("set hoodie.parquet.max.file.size = 10000")
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
spark.sql(s"update $tableName set price = 11 where id = 1")
spark.sql(s"update $tableName set price = 21 where id = 2")
spark.sql(s"update $tableName set price = 31 where id = 3")
spark.sql(s"update $tableName set price = 41 where id = 4")

// expect that all complete parquet files can be scanned
assertQueryResult(4, tablePath)

// async schedule compaction job
spark.sql(s"call run_compaction(op => 'schedule', table => '$tableName')")
.collect()

// expect that all complete parquet files can be scanned with a pending compaction job
assertQueryResult(4, tablePath)

spark.sql(s"insert into $tableName values(5, 'a5', 10, 1000)")

// expect that all complete parquet files can be scanned with a pending compaction job
assertQueryResult(5, tablePath)

// async run compaction job
spark.sql(s"call run_compaction(op => 'run', table => '$tableName')")
.collect()

// assert that all complete parquet files can be scanned after compaction
assertQueryResult(5, tablePath)
}
}

def assertQueryResult(expected: Any,
tablePath: String): Unit = {
val actual = spark.read.format("org.apache.hudi").option("hoodie.datasource.query.type", "read_optimized").load(tablePath).count()
assertResult(expected)(actual)
}
}

0 comments on commit 218240e

Please sign in to comment.