From 885619c0b64ff33e6de91f0785343b67c6f057fa Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 24 Jun 2022 22:11:51 -0700 Subject: [PATCH 01/27] Replaced string paht w/ Hadoop's `Path` to avoid costly conversions in the hot-path --- .../main/java/org/apache/hudi/BaseHoodieTableFileIndex.java | 6 +++--- .../scala/org/apache/hudi/SparkHoodieTableFileIndex.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 0b2c34618ed6..e975f5340ccd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -131,7 +131,7 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext, this.shouldValidateInstant = shouldValidateInstant; this.tableType = metaClient.getTableType(); - this.basePath = metaClient.getBasePath(); + this.basePath = metaClient.getBasePathV2(); this.metaClient = metaClient; this.engineContext = engineContext; @@ -174,7 +174,7 @@ public int getFileSlicesCount() { protected List getAllQueryPartitionPaths() { List queryRelativePartitionPaths = queryPaths.stream() - .map(path -> FSUtils.getRelativePartitionPath(new Path(basePath), path)) + .map(path -> FSUtils.getRelativePartitionPath(basePath, path)) .collect(Collectors.toList()); // Load all the partition path from the basePath, and filter by the query partition path. @@ -353,7 +353,7 @@ public String getPath() { return path; } - Path fullPartitionPath(String basePath) { + Path fullPartitionPath(Path basePath) { if (!path.isEmpty()) { return new CachingPath(basePath, path); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 4e70ebad75ee..1d32e1044514 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -269,7 +269,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, sparkParsePartitionUtil.parsePartition( partitionPath, typeInference = false, - Set(new Path(basePath)), + Set(basePath), partitionDataTypes, DateTimeUtils.getTimeZone(timeZoneId) ) From f2ad090012062409a673bb1e64c7ec9a1ca1432b Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 24 Jun 2022 22:12:22 -0700 Subject: [PATCH 02/27] Avoid fetching `FileSystem.get` since it's unused; Tidying up --- .../hudi/common/table/log/block/HoodieHFileDataBlock.java | 3 +-- .../apache/hudi/metadata/HoodieBackedTableMetadata.java | 7 +------ 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 72cb3a0ef3b4..2edb78e31829 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -167,9 +167,8 @@ protected ClosableIterator deserializeRecords(byte[] content) thr // Get schema from the header Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); - FileSystem fs = FSUtils.getFs(pathForReader.toString(), new Configuration()); // Read the content - HoodieHFileReader reader = new HoodieHFileReader<>(fs, pathForReader, content, Option.of(writerSchema)); + HoodieHFileReader reader = new HoodieHFileReader<>(null, pathForReader, content, Option.of(writerSchema)); Iterator recordIterator = reader.getRecordIterator(readerSchema); return new ClosableIterator() { @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index f8a0389da3d4..7b5b99258910 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -413,7 +413,7 @@ private Pair openReaders( // Open the log record scanner using the log files from the latest file slice List logFiles = slice.getLogFiles().collect(Collectors.toList()); Pair logRecordScannerOpenTimePair = - getLogRecordScanner(logFiles, partitionName); + getLogRecordScanner(logFiles, partitionName, Option.empty()); HoodieMetadataMergedLogRecordReader logRecordScanner = logRecordScannerOpenTimePair.getKey(); final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue(); @@ -465,11 +465,6 @@ private Set getValidInstantTimestamps() { return validInstantTimestamps; } - public Pair getLogRecordScanner(List logFiles, - String partitionName) { - return getLogRecordScanner(logFiles, partitionName, Option.empty()); - } - public Pair getLogRecordScanner(List logFiles, String partitionName, Option allowFullScanOverride) { From a6c3130d0a07c0efae7dd11714c7441d1e5303bc Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 24 Jun 2022 22:54:42 -0700 Subject: [PATCH 03/27] Cache `DateFormatter`/`TimestampFormatter` to avoid loading these t/h reflection in the hot-path --- .../datasources/SparkParsePartitionUtil.scala | 12 ++--- .../apache/spark/sql/hudi/SparkAdapter.scala | 2 +- .../hudi/SparkHoodieTableFileIndex.scala | 5 +- .../spark/sql/adapter/Spark2Adapter.scala | 2 +- .../Spark2ParsePartitionUtil.scala | 14 +++--- .../hudi/spark3/internal/ReflectUtil.java | 4 +- .../spark/sql/adapter/BaseSpark3Adapter.scala | 4 +- .../Spark3ParsePartitionUtil.scala | 48 +++++++++++-------- 8 files changed, 48 insertions(+), 43 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala index 626b3c6ef0d4..2279e5a13f6f 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala @@ -26,10 +26,10 @@ import org.apache.spark.sql.types.DataType trait SparkParsePartitionUtil extends Serializable { - def parsePartition( - path: Path, - typeInference: Boolean, - basePaths: Set[Path], - userSpecifiedDataTypes: Map[String, DataType], - timeZone: TimeZone): InternalRow + def parsePartition(path: Path, + typeInference: Boolean, + basePaths: Set[Path], + userSpecifiedDataTypes: Map[String, DataType], + timeZone: TimeZone, + validatePartitionValues: Boolean = false): InternalRow } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index ab7d0164ea74..eaad4d471e0d 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -85,7 +85,7 @@ trait SparkAdapter extends Serializable { /** * Create the SparkParsePartitionUtil. */ - def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil + def getSparkParsePartitionUtil: SparkParsePartitionUtil /** * ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 1d32e1044514..e327499ebf75 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -79,7 +79,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) }) - private lazy val sparkParsePartitionUtil = sparkAdapter.createSparkParsePartitionUtil(spark.sessionState.conf) + private lazy val sparkParsePartitionUtil = sparkAdapter.getSparkParsePartitionUtil /** * Get the partition schema from the hoodie.properties. @@ -271,7 +271,8 @@ class SparkHoodieTableFileIndex(spark: SparkSession, typeInference = false, Set(basePath), partitionDataTypes, - DateTimeUtils.getTimeZone(timeZoneId) + DateTimeUtils.getTimeZone(timeZoneId), + validatePartitionValues = spark.sessionState.conf.validatePartitionColumns ) .toSeq(partitionSchema) } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index 7cd7271c6b50..91ab71ef1c3a 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -67,7 +67,7 @@ class Spark2Adapter extends SparkAdapter { ) } - override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = new Spark2ParsePartitionUtil + override def getSparkParsePartitionUtil: SparkParsePartitionUtil = Spark2ParsePartitionUtil override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = { throw new IllegalStateException(s"Should not call ParserInterface#parseMultipartIdentifier for spark2") diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala index c3cbcc407587..fe0bf50e6974 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala @@ -24,14 +24,14 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.types._ import org.apache.spark.sql.catalyst.InternalRow -class Spark2ParsePartitionUtil extends SparkParsePartitionUtil { +object Spark2ParsePartitionUtil extends SparkParsePartitionUtil { - override def parsePartition( - path: Path, - typeInference: Boolean, - basePaths: Set[Path], - userSpecifiedDataTypes: Map[String, DataType], - timeZone: TimeZone): InternalRow = { + override def parsePartition(path: Path, + typeInference: Boolean, + basePaths: Set[Path], + userSpecifiedDataTypes: Map[String, DataType], + timeZone: TimeZone, + validatePartitionValues: Boolean = false): InternalRow = { val (partitionValues, _) = PartitioningUtils.parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, timeZone) diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java index 1157a68254a8..d7a9a1f12241 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/ReflectUtil.java @@ -53,12 +53,12 @@ public static DateFormatter getDateFormatter(ZoneId zoneId) { try { ClassLoader loader = Thread.currentThread().getContextClassLoader(); if (HoodieSparkUtils.gteqSpark3_2()) { - Class clazz = loader.loadClass(DateFormatter.class.getName()); + Class clazz = loader.loadClass(DateFormatter.class.getName()); Method applyMethod = clazz.getDeclaredMethod("apply"); applyMethod.setAccessible(true); return (DateFormatter)applyMethod.invoke(null); } else { - Class clazz = loader.loadClass(DateFormatter.class.getName()); + Class clazz = loader.loadClass(DateFormatter.class.getName()); Method applyMethod = clazz.getDeclaredMethod("apply", ZoneId.class); applyMethod.setAccessible(true); return (DateFormatter)applyMethod.invoke(null, zoneId); diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index 5ba976d36207..77df665b98de 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -50,9 +50,7 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { override def getAvroSchemaConverters: HoodieAvroSchemaConverters = HoodieSparkAvroSchemaConverters - override def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = { - new Spark3ParsePartitionUtil(conf) - } + override def getSparkParsePartitionUtil: SparkParsePartitionUtil = Spark3ParsePartitionUtil override def parseMultipartIdentifier(parser: ParserInterface, sqlText: String): Seq[String] = { parser.parseMultipartIdentifier(sqlText) diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala index f0cbe0530f3e..ebe92a5a32a9 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala @@ -17,57 +17,63 @@ package org.apache.spark.sql.execution.datasources -import java.lang.{Boolean => JBoolean, Double => JDouble, Long => JLong} -import java.math.{BigDecimal => JBigDecimal} -import java.time.ZoneId -import java.util.{Locale, TimeZone} - import org.apache.hadoop.fs.Path - import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH import org.apache.hudi.spark3.internal.ReflectUtil - +import org.apache.hudi.util.JFunction import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.execution.datasources.PartitioningUtils.timestampPartitionPattern -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import java.lang.{Boolean => JBoolean, Double => JDouble, Long => JLong} +import java.math.{BigDecimal => JBigDecimal} +import java.time.ZoneId +import java.util +import java.util.concurrent.ConcurrentHashMap +import java.util.{Locale, TimeZone} +import scala.collection.convert.Wrappers.JConcurrentMapWrapper import scala.collection.mutable.ArrayBuffer import scala.util.Try import scala.util.control.NonFatal -class Spark3ParsePartitionUtil(conf: SQLConf) extends SparkParsePartitionUtil { +object Spark3ParsePartitionUtil extends SparkParsePartitionUtil { + + private val cache = JConcurrentMapWrapper( + new ConcurrentHashMap[ZoneId, (DateFormatter, TimestampFormatter)](1)) /** * The definition of PartitionValues has been changed by SPARK-34314 in Spark3.2. * To solve the compatibility between 3.1 and 3.2, copy some codes from PartitioningUtils in Spark3.2 here. * And this method will generate and return `InternalRow` directly instead of `PartitionValues`. */ - override def parsePartition( - path: Path, - typeInference: Boolean, - basePaths: Set[Path], - userSpecifiedDataTypes: Map[String, DataType], - timeZone: TimeZone): InternalRow = { - val dateFormatter = ReflectUtil.getDateFormatter(timeZone.toZoneId) - val timestampFormatter = TimestampFormatter(timestampPartitionPattern, - timeZone.toZoneId, isParsing = true) + override def parsePartition(path: Path, + typeInference: Boolean, + basePaths: Set[Path], + userSpecifiedDataTypes: Map[String, DataType], + tz: TimeZone, + validatePartitionValues: Boolean = false): InternalRow = { + val (dateFormatter, timestampFormatter) = cache.getOrElseUpdate(tz.toZoneId, { + val dateFormatter = ReflectUtil.getDateFormatter(tz.toZoneId) + val timestampFormatter = TimestampFormatter(timestampPartitionPattern, tz.toZoneId, isParsing = true) + + (dateFormatter, timestampFormatter) + }) val (partitionValues, _) = parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, - conf.validatePartitionColumns, timeZone.toZoneId, dateFormatter, timestampFormatter) + validatePartitionValues, tz.toZoneId, dateFormatter, timestampFormatter) partitionValues.map { case PartitionValues(columnNames: Seq[String], typedValues: Seq[TypedPartValue]) => val rowValues = columnNames.zip(typedValues).map { case (columnName, typedValue) => try { - castPartValueToDesiredType(typedValue.dataType, typedValue.value, timeZone.toZoneId) + castPartValueToDesiredType(typedValue.dataType, typedValue.value, tz.toZoneId) } catch { case NonFatal(_) => - if (conf.validatePartitionColumns) { + if (validatePartitionValues) { throw new RuntimeException(s"Failed to cast value `${typedValue.value}` to " + s"`${typedValue.dataType}` for partition column `$columnName`") } else null From c812871dad5946ea4fc596439e3e9f3e751ca411 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 27 Jun 2022 16:01:53 -0700 Subject: [PATCH 04/27] Avoid re-instantiating `HoodieTableMetadata` twice w/in `BaseHoodieTableFileIndex` --- .../apache/hudi/BaseHoodieTableFileIndex.java | 61 +++++++++++++++---- 1 file changed, 49 insertions(+), 12 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index e975f5340ccd..9901c7da866e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -18,6 +18,8 @@ package org.apache.hudi; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -34,14 +36,15 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; +import org.apache.hudi.hadoop.CachingPath; +import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.hadoop.CachingPath; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -63,7 +66,7 @@ *
  • Query instant/range
  • * */ -public abstract class BaseHoodieTableFileIndex { +public abstract class BaseHoodieTableFileIndex implements AutoCloseable { private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class); @@ -94,6 +97,8 @@ public abstract class BaseHoodieTableFileIndex { private transient volatile HoodieTableFileSystemView fileSystemView = null; + private transient HoodieTableMetadata tableMetadata = null; + /** * @param engineContext Hudi engine-specific context * @param metaClient Hudi table's meta-client @@ -172,6 +177,11 @@ public int getFileSlicesCount() { .mapToInt(List::size).sum(); } + @Override + public void close() throws Exception { + resetTableMetadata(null); + } + protected List getAllQueryPartitionPaths() { List queryRelativePartitionPaths = queryPaths.stream() .map(path -> FSUtils.getRelativePartitionPath(basePath, path)) @@ -179,7 +189,7 @@ protected List getAllQueryPartitionPaths() { // Load all the partition path from the basePath, and filter by the query partition path. // TODO load files from the queryRelativePartitionPaths directly. - List matchedPartitionPaths = FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath) + List matchedPartitionPaths = getAllPartitionPathsUnchecked() .stream() .filter(path -> queryRelativePartitionPaths.stream().anyMatch(path::startsWith)) .collect(Collectors.toList()); @@ -244,12 +254,7 @@ private Map loadPartitionPathFiles() { ); fetchedPartitionToFiles = - FSUtils.getFilesInPartitions( - engineContext, - metadataConfig, - basePath, - fullPartitionPathsMapToFetch.keySet().toArray(new String[0]), - fileSystemStorageConfig.getSpillableDir()) + getAllFilesInPartitionsUnchecked(fullPartitionPathsMapToFetch.keySet()) .entrySet() .stream() .collect(Collectors.toMap(e -> fullPartitionPathsMapToFetch.get(e.getKey()), e -> e.getValue())); @@ -267,6 +272,11 @@ private Map loadPartitionPathFiles() { private void doRefresh() { long startTime = System.currentTimeMillis(); + HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePath.toString(), + FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()); + + resetTableMetadata(newTableMetadata); + Map partitionFiles = loadPartitionPathFiles(); FileStatus[] allFiles = partitionFiles.values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new); @@ -278,7 +288,7 @@ private void doRefresh() { // TODO we can optimize the flow by: // - First fetch list of files from instants of interest // - Load FileStatus's - fileSystemView = new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles); + this.fileSystemView = new HoodieTableFileSystemView(metaClient, activeTimeline, allFiles); Option queryInstant = specifiedQueryInstant.or(() -> latestInstant.map(HoodieInstant::getTimestamp)); @@ -324,6 +334,22 @@ private void doRefresh() { LOG.info(String.format("Refresh table %s, spent: %d ms", metaClient.getTableConfig().getTableName(), duration)); } + private Map getAllFilesInPartitionsUnchecked(Collection fullPartitionPathsMapToFetch) { + try { + return tableMetadata.getAllFilesInPartitions(new ArrayList<>(fullPartitionPathsMapToFetch)); + } catch (IOException e) { + throw new HoodieIOException("Failed to list partition paths for a table", e); + } + } + + private List getAllPartitionPathsUnchecked() { + try { + return tableMetadata.getAllPartitionPaths(); + } catch (IOException e) { + throw new HoodieIOException("Failed to fetch partition paths for a table", e); + } + } + private void validate(HoodieTimeline activeTimeline, Option queryInstant) { if (shouldValidateInstant) { if (queryInstant.isPresent() && !activeTimeline.containsInstant(queryInstant.get())) { @@ -340,6 +366,17 @@ private static long fileSliceSize(FileSlice fileSlice) { return fileSlice.getBaseFile().map(BaseFile::getFileLen).orElse(0L) + logFileSize; } + private void resetTableMetadata(HoodieTableMetadata newTableMetadata) { + if (tableMetadata != null) { + try { + tableMetadata.close(); + } catch (Exception e) { + throw new HoodieException("Failed to close HoodieTableMetadata instance", e); + } + } + tableMetadata = newTableMetadata; + } + public static final class PartitionPath { final String path; final Object[] values; From 6c963234bfba23429132ca0d7d7b243efe16f43b Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 27 Jun 2022 19:37:52 -0700 Subject: [PATCH 05/27] Fixing compilation --- .../apache/hudi/BaseHoodieTableFileIndex.java | 3 +-- .../table/log/block/HoodieHFileDataBlock.java | 23 ++++++++----------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 9901c7da866e..12b285f3cf6b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -38,7 +38,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.hadoop.CachingPath; import org.apache.log4j.LogManager; @@ -83,7 +82,7 @@ public abstract class BaseHoodieTableFileIndex implements AutoCloseable { private final boolean shouldValidateInstant; private final HoodieTableType tableType; - protected final String basePath; + protected final Path basePath; private final HoodieTableMetaClient metaClient; private final HoodieEngineContext engineContext; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 2edb78e31829..d923c592708c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -18,19 +18,6 @@ package org.apache.hudi.common.table.log.block; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.fs.inline.InLineFSUtils; -import org.apache.hudi.common.fs.inline.InLineFileSystem; -import org.apache.hudi.common.util.ClosableIterator; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.storage.HoodieHBaseKVComparator; -import org.apache.hudi.io.storage.HoodieHFileReader; - import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; @@ -43,6 +30,16 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.fs.inline.InLineFSUtils; +import org.apache.hudi.common.fs.inline.InLineFileSystem; +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieHBaseKVComparator; +import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; From a3945f2defbe1722d252fc6642e6683e033bb79c Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 27 Jun 2022 16:03:50 -0700 Subject: [PATCH 06/27] Avoid creating costly Hadoop's `Path` object wherever possible, replacing it w/ - `CachingPath` object - Invoking more performnat unsafe ctors/utils --- .../apache/hudi/BaseHoodieTableFileIndex.java | 6 ++- .../org/apache/hudi/common/fs/FSUtils.java | 13 +++-- .../apache/hudi/common/model/BaseFile.java | 9 ++++ .../view/AbstractTableFileSystemView.java | 13 ++--- .../org/apache/hudi/hadoop/CachingPath.java | 51 +++++++++++++++++++ .../hudi/metadata/BaseTableMetadata.java | 15 +++--- .../hudi/metadata/HoodieMetadataPayload.java | 11 +++- 7 files changed, 97 insertions(+), 21 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 12b285f3cf6b..7c7b01c3c58d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -55,6 +55,8 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.hudi.hadoop.CachingPath.createPathUnsafe; + /** * Common (engine-agnostic) File Index implementation enabling individual query engines to * list Hudi Table contents based on the @@ -391,7 +393,9 @@ public String getPath() { Path fullPartitionPath(Path basePath) { if (!path.isEmpty()) { - return new CachingPath(basePath, path); + // NOTE: Since we now that the path is a proper relative path that doesn't require + // normalization we create Hadoop's Path using more performant unsafe variant + return new CachingPath(basePath, createPathUnsafe(path)); } return new CachingPath(basePath); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index d940f3bb4577..fe697197f20e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -34,6 +34,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.InvalidHoodiePathException; +import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hadoop.conf.Configuration; @@ -68,6 +69,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.hadoop.CachingPath.getPathWithoutSchemeAndAuthority; + /** * Utility functions related to accessing the file storage. */ @@ -216,8 +219,8 @@ public static List getAllPartitionFoldersThreeLevelsDown(FileSystem fs, * Given a base partition and a partition path, return relative path of partition path to the base path. */ public static String getRelativePartitionPath(Path basePath, Path fullPartitionPath) { - basePath = Path.getPathWithoutSchemeAndAuthority(basePath); - fullPartitionPath = Path.getPathWithoutSchemeAndAuthority(fullPartitionPath); + basePath = getPathWithoutSchemeAndAuthority(basePath); + fullPartitionPath = getPathWithoutSchemeAndAuthority(fullPartitionPath); String fullPartitionPathStr = fullPartitionPath.toString(); @@ -607,12 +610,12 @@ public static Path getPartitionPath(String basePath, String partitionPath) { String properPartitionPath = partitionPath.startsWith("/") ? partitionPath.substring(1) : partitionPath; - return getPartitionPath(new Path(basePath), properPartitionPath); + return getPartitionPath(new CachingPath(basePath), properPartitionPath); } public static Path getPartitionPath(Path basePath, String partitionPath) { - // FOr non-partitioned table, return only base-path - return StringUtils.isNullOrEmpty(partitionPath) ? basePath : new Path(basePath, partitionPath); + // For non-partitioned table, return only base-path + return StringUtils.isNullOrEmpty(partitionPath) ? basePath : new CachingPath(basePath, partitionPath); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java index cd35861b7499..fe9837e6c693 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hudi.hadoop.CachingPath; import java.io.Serializable; import java.util.Objects; @@ -66,6 +67,14 @@ public String getPath() { return fullPath; } + public Path getHadoopPath() { + if (fileStatus != null) { + return fileStatus.getPath(); + } + + return new CachingPath(fullPath); + } + public String getFileName() { return fileName; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index dc6fc47b5808..5818636caef2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -95,8 +95,8 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV private BootstrapIndex bootstrapIndex; - private String getPartitionPathFromFilePath(String fullPath) { - return FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), new Path(fullPath).getParent()); + private String getPartitionPathFor(HoodieBaseFile baseFile) { + return FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), baseFile.getHadoopPath().getParent()); } /** @@ -166,8 +166,8 @@ protected List buildFileGroups(FileStatus[] statuses, HoodieTim protected List buildFileGroups(Stream baseFileStream, Stream logFileStream, HoodieTimeline timeline, boolean addPendingCompactionFileSlice) { Map, List> baseFiles = - baseFileStream.collect(Collectors.groupingBy((baseFile) -> { - String partitionPathStr = getPartitionPathFromFilePath(baseFile.getPath()); + baseFileStream.collect(Collectors.groupingBy(baseFile -> { + String partitionPathStr = getPartitionPathFor(baseFile); return Pair.of(partitionPathStr, baseFile.getFileId()); })); @@ -183,7 +183,8 @@ protected List buildFileGroups(Stream baseFileS List fileGroups = new ArrayList<>(); fileIdSet.forEach(pair -> { String fileId = pair.getValue(); - HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, timeline); + String partitionPath = pair.getKey(); + HoodieFileGroup group = new HoodieFileGroup(partitionPath, fileId, timeline); if (baseFiles.containsKey(pair)) { baseFiles.get(pair).forEach(group::addBaseFile); } @@ -373,7 +374,7 @@ private Stream convertFileStatusesToLogFiles(FileStatus[] statuse * @param baseFile base File */ protected boolean isBaseFileDueToPendingCompaction(HoodieBaseFile baseFile) { - final String partitionPath = getPartitionPathFromFilePath(baseFile.getPath()); + final String partitionPath = getPartitionPathFor(baseFile); Option> compactionWithInstantTime = getPendingCompactionOperationWithInstant(new HoodieFileGroupId(partitionPath, baseFile.getFileId())); diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java index 01b3eb9d409b..caed2ecd4ac9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java @@ -19,10 +19,12 @@ package org.apache.hudi.hadoop; import org.apache.hadoop.fs.Path; +import org.apache.hudi.exception.HoodieException; import javax.annotation.concurrent.ThreadSafe; import java.io.Serializable; import java.net.URI; +import java.net.URISyntaxException; /** * This is an extension of the {@code Path} class allowing to avoid repetitive @@ -37,6 +39,7 @@ public class CachingPath extends Path implements Serializable { // NOTE: `volatile` keyword is redundant here and put mostly for reader notice, since all // reads/writes to references are always atomic (including 64-bit JVMs) // https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.7 + private volatile Path parent; private volatile String fileName; private volatile String fullPathStr; @@ -74,6 +77,17 @@ public String getName() { return fileName; } + @Override + public Path getParent() { + // This value could be overwritten concurrently and that's okay, since + // {@code Path} is immutable + if (parent == null) { + parent = super.getParent(); + } + + return parent; + } + @Override public String toString() { // This value could be overwritten concurrently and that's okay, since @@ -83,4 +97,41 @@ public String toString() { } return fullPathStr; } + + public CachingPath subPath(String relativePath) { + return new CachingPath(this, createPathUnsafe(relativePath)); + } + + public static CachingPath wrap(Path path) { + if (path instanceof CachingPath) { + return (CachingPath) path; + } + + return new CachingPath(path.toUri()); + } + + /** + * TODO elaborate + */ + public static CachingPath createPathUnsafe(String relativePath) { + try { + // NOTE: {@code normalize} is going to be invoked by {@code Path} ctor, so there's no + // point in invoking it here + URI uri = new URI(null, null, relativePath, null, null); + return new CachingPath(uri); + } catch (URISyntaxException e) { + throw new HoodieException("Failed to instantiate relative path", e); + } + } + + /** + * TODO elaborate + */ + public static Path getPathWithoutSchemeAndAuthority(Path path) { + // This code depends on Path.toString() to remove the leading slash before + // the drive specification on Windows. + return path.isUriPathAbsolute() ? + createPathUnsafe(path.toUri().getPath()) : + path; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 2036500ac656..0320a02b8d3b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hudi.hadoop.CachingPath; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -68,7 +69,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { protected final transient HoodieEngineContext engineContext; protected final SerializableConfiguration hadoopConf; - protected final String dataBasePath; + protected final Path dataBasePath; protected final HoodieTableMetaClient dataMetaClient; protected final Option metrics; protected final HoodieMetadataConfig metadataConfig; @@ -83,7 +84,7 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon String dataBasePath, String spillableMapDirectory) { this.engineContext = engineContext; this.hadoopConf = new SerializableConfiguration(engineContext.getHadoopConf()); - this.dataBasePath = dataBasePath; + this.dataBasePath = new CachingPath(dataBasePath); this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(dataBasePath).build(); this.spillableMapDirectory = spillableMapDirectory; this.metadataConfig = metadataConfig; @@ -113,7 +114,7 @@ public List getAllPartitionPaths() throws IOException { throw new HoodieMetadataException("Failed to retrieve list of partition from metadata", e); } } - return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath, + return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning()).getAllPartitionPaths(); } @@ -138,7 +139,7 @@ public FileStatus[] getAllFilesInPartition(Path partitionPath) } } - return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath, metadataConfig.shouldAssumeDatePartitioning()) + return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning()) .getAllFilesInPartition(partitionPath); } @@ -154,7 +155,7 @@ public Map getAllFilesInPartitions(List partitions } } - return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath, metadataConfig.shouldAssumeDatePartitioning()) + return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning()) .getAllFilesInPartitions(partitions); } @@ -303,7 +304,7 @@ protected List fetchAllPartitionPaths() { * @param partitionPath The absolute path of the partition */ FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException { - String partitionName = FSUtils.getRelativePartitionPath(new Path(dataBasePath), partitionPath); + String partitionName = FSUtils.getRelativePartitionPath(dataBasePath, partitionPath); if (partitionName.isEmpty()) { partitionName = NON_PARTITIONED_NAME; } @@ -327,7 +328,7 @@ Map fetchAllFilesInPartitionPaths(List partitionPath Map partitionInfo = new HashMap<>(); boolean foundNonPartitionedPath = false; for (Path partitionPath: partitionPaths) { - String partitionName = FSUtils.getRelativePartitionPath(new Path(dataBasePath), partitionPath); + String partitionName = FSUtils.getRelativePartitionPath(dataBasePath, partitionPath); if (partitionName.isEmpty()) { if (partitionInfo.size() > 1) { throw new HoodieMetadataException("Found mix of partitioned and non partitioned paths while fetching data from metadata table"); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index df138cd12497..6a93102d82cd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -53,6 +53,7 @@ import org.apache.hudi.common.util.hash.FileIndexID; import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.io.storage.HoodieHFileReader; import org.apache.hudi.util.Lazy; @@ -80,6 +81,7 @@ import static org.apache.hudi.common.util.TypeUtils.unsafeCast; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; import static org.apache.hudi.common.util.ValidationUtils.checkState; +import static org.apache.hudi.hadoop.CachingPath.createPathUnsafe; import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionIdentifier; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal; @@ -475,8 +477,13 @@ public FileStatus[] getFileStatuses(Configuration hadoopConf, Path partitionPath FileSystem fs = partitionPath.getFileSystem(hadoopConf); long blockSize = fs.getDefaultBlockSize(partitionPath); return filterFileInfoEntries(false) - .map(e -> new FileStatus(e.getValue().getSize(), false, 0, blockSize, 0, 0, - null, null, null, new Path(partitionPath, e.getKey()))) + .map(e -> { + // NOTE: Since we now that the Metadata Table's Payload is simply a file-name we're + // creating Hadoop's Path using more performant unsafe variant + CachingPath filePath = new CachingPath(partitionPath, createPathUnsafe(e.getKey())); + return new FileStatus(e.getValue().getSize(), false, 0, blockSize, 0, 0, + null, null, null, filePath); + }) .toArray(FileStatus[]::new); } From 8bc2456b26f8dd4c0b2c60d37aed02fb7771b2b0 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 27 Jun 2022 16:04:26 -0700 Subject: [PATCH 07/27] Fixing compilation --- .../main/java/org/apache/hudi/BaseHoodieTableFileIndex.java | 1 + .../src/main/java/org/apache/hudi/hadoop/CachingPath.java | 6 +++--- .../org/apache/hudi/metadata/HoodieBackedTableMetadata.java | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 7c7b01c3c58d..aa423c214854 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -38,6 +38,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.hadoop.CachingPath; import org.apache.log4j.LogManager; diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java index caed2ecd4ac9..077e3b2e50c1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java @@ -130,8 +130,8 @@ public static CachingPath createPathUnsafe(String relativePath) { public static Path getPathWithoutSchemeAndAuthority(Path path) { // This code depends on Path.toString() to remove the leading slash before // the drive specification on Windows. - return path.isUriPathAbsolute() ? - createPathUnsafe(path.toUri().getPath()) : - path; + return path.isUriPathAbsolute() + ? createPathUnsafe(path.toUri().getPath()) + : path; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 7b5b99258910..2fc878d6d1e2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -111,7 +111,7 @@ public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetada } private void initIfNeeded() { - this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(dataBasePath); + this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(dataBasePath.toString()); if (!isMetadataTableEnabled) { if (!HoodieTableMetadata.isMetadataTable(metadataBasePath)) { LOG.info("Metadata table is disabled."); From 8e10cb89f6def61ba403a60bc3589fc129480b0e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 30 Jun 2022 11:04:00 -0700 Subject: [PATCH 08/27] Avoid loading defaults in Hadoop conf when init-ing HFile reader; Use `CachingPath` in `SparkHoodieTableFileIndex`; Tidying up; --- .../HoodieBackedTableMetadataWriter.java | 15 ++++++++++---- .../hudi/io/storage/HoodieHFileUtils.java | 4 +++- .../metadata/HoodieBackedTableMetadata.java | 2 -- .../hudi/SparkHoodieTableFileIndex.scala | 20 ++++++++++--------- pom.xml | 2 +- 5 files changed, 26 insertions(+), 17 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index c7cc50967a48..b09ce344f466 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -554,7 +554,8 @@ private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, enabledPartitionTypes = this.enabledPartitionTypes; } initializeEnabledFileGroups(dataMetaClient, createInstantTime, enabledPartitionTypes); - initialCommit(createInstantTime, enabledPartitionTypes); + // TODO REVERT + //initialCommit(createInstantTime, enabledPartitionTypes); updateInitializedPartitionsInTableConfig(enabledPartitionTypes); return true; } @@ -1006,21 +1007,27 @@ protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String inst // finish off any pending compactions if any from previous attempt. writeClient.runAnyPendingCompactions(); - String latestDeltacommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant() + // TODO revert + HoodieTimeline timeline = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants(); + if (timeline.empty()) { + return; + } + + String latestDeltaCommitTime = timeline.lastInstant() .get().getTimestamp(); List pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested() .findInstantsBefore(instantTime).getInstants().collect(Collectors.toList()); if (!pendingInstants.isEmpty()) { LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s", - pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray()))); + pendingInstants.size(), latestDeltaCommitTime, Arrays.toString(pendingInstants.toArray()))); return; } // Trigger compaction with suffixes based on the same instant time. This ensures that any future // delta commits synced over will not have an instant time lesser than the last completed instant on the // metadata table. - final String compactionInstantTime = latestDeltacommitTime + "001"; + final String compactionInstantTime = latestDeltaCommitTime + "001"; if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) { writeClient.compact(compactionInstantTime); } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java index 3767ea183257..878a3c563b6f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileUtils.java @@ -67,7 +67,9 @@ public static HFile.Reader createHFileReader( */ public static HFile.Reader createHFileReader( FileSystem fs, Path dummyPath, byte[] content) throws IOException { - Configuration conf = new Configuration(); + // Avoid loading default configs, from the FS, since this configuration is mostly + // used as a stub to initialize HFile reader + Configuration conf = new Configuration(false); HoodieHFileReader.SeekableByteArrayInputStream bis = new HoodieHFileReader.SeekableByteArrayInputStream(content); FSDataInputStream fsdis = new FSDataInputStream(bis); FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fsdis); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 2fc878d6d1e2..d4865875b166 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -303,8 +303,6 @@ private List>>> readFrom } } - List>>> result = new ArrayList<>(); - HoodieTimer readTimer = new HoodieTimer(); readTimer.startTimer(); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index e327499ebf75..f95965460940 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -26,6 +26,8 @@ import org.apache.hudi.common.bootstrap.index.BootstrapIndex import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.hadoop.CachingPath +import org.apache.hudi.hadoop.CachingPath.createPathUnsafe import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging @@ -245,16 +247,16 @@ class SparkHoodieTableFileIndex(spark: SparkSession, // HIVE_STYLE_PARTITIONING is disable. // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02" val partitionWithName = - partitionFragments.zip(partitionColumns).map { - case (partition, columnName) => - if (partition.indexOf("=") == -1) { - s"${columnName}=$partition" - } else { - partition - } - }.mkString("/") + partitionFragments.zip(partitionColumns).map { + case (partition, columnName) => + if (partition.indexOf("=") == -1) { + s"${columnName}=$partition" + } else { + partition + } + }.mkString("/") - val pathWithPartitionName = new Path(basePath, partitionWithName) + val pathWithPartitionName = new CachingPath(basePath, createPathUnsafe(partitionWithName)) val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema) partitionValues.map(_.asInstanceOf[Object]).toArray diff --git a/pom.xml b/pom.xml index bd192f6846b4..8096c4887614 100644 --- a/pom.xml +++ b/pom.xml @@ -155,7 +155,7 @@ 2.11.12 2.12.10 ${scala11.version} - 2.11 + 2.12 0.13 3.3.1 3.0.1 From 803facfb0bbb6a1e5f844273be5714f1470c11ce Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 30 Jun 2022 14:58:09 -0700 Subject: [PATCH 09/27] Short-circuit fetching partition path for non-partitioned tables (simply return ""); Use MT only for partitioned tables for non-partitioned ones (for which FS listing is likely faster) --- .../apache/hudi/BaseHoodieTableFileIndex.java | 15 +++++++++++-- .../hudi/metadata/HoodieTableMetadata.java | 21 +++++++++++++++---- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index aa423c214854..3bf44de87c4b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -274,8 +274,15 @@ private Map loadPartitionPathFiles() { private void doRefresh() { long startTime = System.currentTimeMillis(); - HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePath.toString(), - FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()); + HoodieTableMetadata newTableMetadata; + + // TODO make configurable + if (partitionColumns.length > 0) { + newTableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePath.toString(), + FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()); + } else { + newTableMetadata = HoodieTableMetadata.createFSBackedTableMetadata(engineContext, metadataConfig, basePath.toString()); + } resetTableMetadata(newTableMetadata); @@ -346,6 +353,10 @@ private Map getAllFilesInPartitionsUnchecked(Collection getAllPartitionPathsUnchecked() { try { + if (partitionColumns.length == 0) { + return Collections.singletonList(""); + } + return tableMetadata.getAllPartitionPaths(); } catch (IOException e) { throw new HoodieIOException("Failed to fetch partition paths for a table", e); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index ae871e3be0c0..349c0efb482a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -29,7 +29,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; - import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieMetadataException; @@ -107,13 +106,27 @@ static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetad static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, String spillableMapPath, boolean reuse) { if (metadataConfig.enabled()) { - return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse); + return createHoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse); } else { - return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()), - datasetBasePath, metadataConfig.shouldAssumeDatePartitioning()); + return createFSBackedTableMetadata(engineContext, metadataConfig, datasetBasePath); } } + static FileSystemBackedTableMetadata createFSBackedTableMetadata(HoodieEngineContext engineContext, + HoodieMetadataConfig metadataConfig, + String datasetBasePath) { + return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()), + datasetBasePath, metadataConfig.shouldAssumeDatePartitioning()); + } + + static HoodieBackedTableMetadata createHoodieBackedTableMetadata(HoodieEngineContext engineContext, + HoodieMetadataConfig metadataConfig, + String datasetBasePath, + String spillableMapPath, + boolean reuse) { + return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse); + } + /** * Fetch all the files at the given partition path, per the latest snapshot of the metadata. */ From fcc7e9526a39ebdc1fc1d045c323cc9375fafa3d Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 30 Jun 2022 15:42:53 -0700 Subject: [PATCH 10/27] Cleaned up `BaseTableMetadata` --- .../hudi/metadata/BaseTableMetadata.java | 118 ++++++++++-------- 1 file changed, 65 insertions(+), 53 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 0320a02b8d3b..e84182de8910 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -39,6 +39,7 @@ import org.apache.hudi.common.util.hash.ColumnIndexID; import org.apache.hudi.common.util.hash.FileIndexID; import org.apache.hudi.common.util.hash.PartitionIndexID; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hadoop.fs.FileStatus; @@ -56,8 +57,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; +import java.util.function.Function; import java.util.stream.Collectors; public abstract class BaseTableMetadata implements HoodieTableMetadata { @@ -279,20 +282,23 @@ public Map, HoodieMetadataColumnStats> getColumnStats(final */ protected List fetchAllPartitionPaths() { HoodieTimer timer = new HoodieTimer().startTimer(); - Option> hoodieRecord = getRecordByKey(RECORDKEY_PARTITION_LIST, + Option> recordOpt = getRecordByKey(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.getPartitionPath()); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer())); - List partitions = Collections.emptyList(); - if (hoodieRecord.isPresent()) { - handleSpuriousDeletes(hoodieRecord, "\"all partitions\""); - partitions = hoodieRecord.get().getData().getFilenames(); - // Partition-less tables have a single empty partition - if (partitions.contains(NON_PARTITIONED_NAME)) { - partitions.remove(NON_PARTITIONED_NAME); - partitions.add(""); + List partitions = recordOpt.map(record -> { + HoodieMetadataPayload metadataPayload = record.getData(); + checkForSpuriousDeletes(metadataPayload, "\"all partitions\""); + + List relativePaths = metadataPayload.getFilenames(); + // Non-partitioned tables have a single empty partition + if (relativePaths.size() == 1 && relativePaths.get(0).equals(NON_PARTITIONED_NAME)) { + return Collections.singletonList(""); + } else { + return relativePaths; } - } + }) + .orElse(Collections.emptyList()); LOG.info("Listed partitions from metadata: #partitions=" + partitions.size()); return partitions; @@ -304,75 +310,81 @@ protected List fetchAllPartitionPaths() { * @param partitionPath The absolute path of the partition */ FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException { - String partitionName = FSUtils.getRelativePartitionPath(dataBasePath, partitionPath); - if (partitionName.isEmpty()) { - partitionName = NON_PARTITIONED_NAME; - } + String relativePartitionPath = FSUtils.getRelativePartitionPath(dataBasePath, partitionPath); + String recordKey = relativePartitionPath.isEmpty() ? NON_PARTITIONED_NAME : relativePartitionPath; HoodieTimer timer = new HoodieTimer().startTimer(); - Option> hoodieRecord = getRecordByKey(partitionName, + Option> recordOpt = getRecordByKey(recordKey, MetadataPartitionType.FILES.getPartitionPath()); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer())); - FileStatus[] statuses = {}; - if (hoodieRecord.isPresent()) { - handleSpuriousDeletes(hoodieRecord, partitionName); - statuses = hoodieRecord.get().getData().getFileStatuses(hadoopConf.get(), partitionPath); - } + FileStatus[] statuses = recordOpt.map(record -> { + HoodieMetadataPayload metadataPayload = record.getData(); + checkForSpuriousDeletes(metadataPayload, recordKey); + return extractFileStatuses(partitionPath, metadataPayload); + }).orElse(new FileStatus[0]); - LOG.info("Listed file in partition from metadata: partition=" + partitionName + ", #files=" + statuses.length); + LOG.info("Listed file in partition from metadata: partition=" + relativePartitionPath + ", #files=" + statuses.length); return statuses; } Map fetchAllFilesInPartitionPaths(List partitionPaths) throws IOException { - Map partitionInfo = new HashMap<>(); - boolean foundNonPartitionedPath = false; - for (Path partitionPath: partitionPaths) { - String partitionName = FSUtils.getRelativePartitionPath(dataBasePath, partitionPath); - if (partitionName.isEmpty()) { - if (partitionInfo.size() > 1) { - throw new HoodieMetadataException("Found mix of partitioned and non partitioned paths while fetching data from metadata table"); - } - partitionInfo.put(NON_PARTITIONED_NAME, partitionPath); - foundNonPartitionedPath = true; - } else { - if (foundNonPartitionedPath) { - throw new HoodieMetadataException("Found mix of partitioned and non partitioned paths while fetching data from metadata table"); - } - partitionInfo.put(partitionName, partitionPath); - } - } + Map partitionIdToPathMap = + partitionPaths.parallelStream() + .collect( + Collectors.toMap(partitionPath -> { + String partitionId = FSUtils.getRelativePartitionPath(dataBasePath, partitionPath); + return partitionId.isEmpty() ? NON_PARTITIONED_NAME : partitionId; + }, Function.identity()) + ); HoodieTimer timer = new HoodieTimer().startTimer(); - List>>> partitionsFileStatus = - getRecordsByKeys(new ArrayList<>(partitionInfo.keySet()), MetadataPartitionType.FILES.getPartitionPath()); + List>>> partitionIdRecordPairs = + getRecordsByKeys(new ArrayList<>(partitionIdToPathMap.keySet()), MetadataPartitionType.FILES.getPartitionPath()); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer())); - Map result = new HashMap<>(); - for (Pair>> entry: partitionsFileStatus) { - if (entry.getValue().isPresent()) { - handleSpuriousDeletes(entry.getValue(), entry.getKey()); - result.put(partitionInfo.get(entry.getKey()).toString(), entry.getValue().get().getData().getFileStatuses(hadoopConf.get(), partitionInfo.get(entry.getKey()))); - } - } + Map partitionPathToFilesMap = partitionIdRecordPairs.parallelStream() + .map(pair -> { + String partitionId = pair.getKey(); + Option> recordOpt = pair.getValue(); + + Path partitionPath = partitionIdToPathMap.get(partitionId); + + return recordOpt.map(record -> { + HoodieMetadataPayload metadataPayload = record.getData(); + checkForSpuriousDeletes(metadataPayload, partitionId); + + FileStatus[] files = extractFileStatuses(partitionPath, metadataPayload); + return Pair.of(partitionPath.toString(), files); + }).orElse(null); + }) + .filter(Objects::nonNull) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); LOG.info("Listed files in partitions from metadata: partition list =" + Arrays.toString(partitionPaths.toArray())); - return result; + + return partitionPathToFilesMap; + } + + private FileStatus[] extractFileStatuses(Path partitionPath, HoodieMetadataPayload metadataPayload) { + try { + return metadataPayload.getFileStatuses(hadoopConf.get(), partitionPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to extract file-statuses from the payload", e); + } } /** * Handle spurious deletes. Depending on config, throw an exception or log a warn msg. - * @param hoodieRecord instance of {@link HoodieRecord} of interest. - * @param partitionName partition name of interest. */ - private void handleSpuriousDeletes(Option> hoodieRecord, String partitionName) { - if (!hoodieRecord.get().getData().getDeletions().isEmpty()) { + private void checkForSpuriousDeletes(HoodieMetadataPayload metadataPayload, String partitionName) { + if (!metadataPayload.getDeletions().isEmpty()) { if (metadataConfig.ignoreSpuriousDeletes()) { LOG.warn("Metadata record for " + partitionName + " encountered some files to be deleted which was not added before. " + "Ignoring the spurious deletes as the `" + HoodieMetadataConfig.IGNORE_SPURIOUS_DELETES.key() + "` config is set to true"); } else { throw new HoodieMetadataException("Metadata record for " + partitionName + " is inconsistent: " - + hoodieRecord.get().getData()); + + metadataPayload); } } } From 51f27e62a30bc8d967b4655bd46ead07214ab6ad Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 30 Jun 2022 15:50:52 -0700 Subject: [PATCH 11/27] Avoid looking up `FileSystem` for every partition when listing partitioned table, instead do it just once --- .../hudi/metadata/BaseTableMetadata.java | 29 +++++++++++-------- .../hudi/metadata/HoodieMetadataPayload.java | 7 +++++ 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index e84182de8910..2c522973ccf3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -19,6 +19,7 @@ package org.apache.hudi.metadata; +import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.avro.model.HoodieMetadataBloomFilter; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.common.bloom.BloomFilter; @@ -149,6 +150,10 @@ public FileStatus[] getAllFilesInPartition(Path partitionPath) @Override public Map getAllFilesInPartitions(List partitions) throws IOException { + if (partitions.isEmpty()) { + return Collections.emptyMap(); + } + if (isMetadataTableEnabled) { try { List partitionPaths = partitions.stream().map(Path::new).collect(Collectors.toList()); @@ -321,8 +326,13 @@ FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException { FileStatus[] statuses = recordOpt.map(record -> { HoodieMetadataPayload metadataPayload = record.getData(); checkForSpuriousDeletes(metadataPayload, recordKey); - return extractFileStatuses(partitionPath, metadataPayload); - }).orElse(new FileStatus[0]); + try { + return metadataPayload.getFileStatuses(hadoopConf.get(), partitionPath); + } catch (IOException e) { + throw new HoodieIOException("Failed to extract file-statuses from the payload", e); + } + }) + .orElse(new FileStatus[0]); LOG.info("Listed file in partition from metadata: partition=" + relativePartitionPath + ", #files=" + statuses.length); return statuses; @@ -343,6 +353,8 @@ Map fetchAllFilesInPartitionPaths(List partitionPath getRecordsByKeys(new ArrayList<>(partitionIdToPathMap.keySet()), MetadataPartitionType.FILES.getPartitionPath()); metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer())); + FileSystem fs = partitionPaths.get(0).getFileSystem(hadoopConf.get()); + Map partitionPathToFilesMap = partitionIdRecordPairs.parallelStream() .map(pair -> { String partitionId = pair.getKey(); @@ -354,9 +366,10 @@ Map fetchAllFilesInPartitionPaths(List partitionPath HoodieMetadataPayload metadataPayload = record.getData(); checkForSpuriousDeletes(metadataPayload, partitionId); - FileStatus[] files = extractFileStatuses(partitionPath, metadataPayload); + FileStatus[] files = metadataPayload.getFileStatuses(fs, partitionPath); return Pair.of(partitionPath.toString(), files); - }).orElse(null); + }) + .orElse(null); }) .filter(Objects::nonNull) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); @@ -366,14 +379,6 @@ Map fetchAllFilesInPartitionPaths(List partitionPath return partitionPathToFilesMap; } - private FileStatus[] extractFileStatuses(Path partitionPath, HoodieMetadataPayload metadataPayload) { - try { - return metadataPayload.getFileStatuses(hadoopConf.get(), partitionPath); - } catch (IOException e) { - throw new HoodieIOException("Failed to extract file-statuses from the payload", e); - } - } - /** * Handle spurious deletes. Depending on config, throw an exception or log a warn msg. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 6a93102d82cd..c7b9b3499181 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -475,6 +475,13 @@ public Option getColumnStatMetadata() { */ public FileStatus[] getFileStatuses(Configuration hadoopConf, Path partitionPath) throws IOException { FileSystem fs = partitionPath.getFileSystem(hadoopConf); + return getFileStatuses(fs, partitionPath); + } + + /** + * Returns the files added as part of this record. + */ + public FileStatus[] getFileStatuses(FileSystem fs, Path partitionPath) { long blockSize = fs.getDefaultBlockSize(partitionPath); return filterFileInfoEntries(false) .map(e -> { From bfe5701e1ca232931dbe7c88a9b8b08023078bf2 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 30 Jun 2022 16:50:05 -0700 Subject: [PATCH 12/27] `lint` --- .../main/java/org/apache/hudi/metadata/BaseTableMetadata.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 2c522973ccf3..2a51b881d670 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -303,7 +303,7 @@ protected List fetchAllPartitionPaths() { return relativePaths; } }) - .orElse(Collections.emptyList()); + .orElse(Collections.emptyList()); LOG.info("Listed partitions from metadata: #partitions=" + partitions.size()); return partitions; From 045c526ad8a057f0f71ad62cc67e8cc398b3d310 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 5 Jul 2022 12:44:13 -0700 Subject: [PATCH 13/27] Revert accidental change --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 8096c4887614..bd192f6846b4 100644 --- a/pom.xml +++ b/pom.xml @@ -155,7 +155,7 @@ 2.11.12 2.12.10 ${scala11.version} - 2.12 + 2.11 0.13 3.3.1 3.0.1 From 350c16e6c2fad6878ee99320eae16e33f99c024e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 25 Jul 2022 10:28:14 -0700 Subject: [PATCH 14/27] Reverting experimental changes --- .../metadata/HoodieBackedTableMetadataWriter.java | 11 ++--------- .../org/apache/hudi/BaseHoodieTableFileIndex.java | 11 ++--------- 2 files changed, 4 insertions(+), 18 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index b09ce344f466..962875fb924f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -554,8 +554,7 @@ private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, enabledPartitionTypes = this.enabledPartitionTypes; } initializeEnabledFileGroups(dataMetaClient, createInstantTime, enabledPartitionTypes); - // TODO REVERT - //initialCommit(createInstantTime, enabledPartitionTypes); + initialCommit(createInstantTime, enabledPartitionTypes); updateInitializedPartitionsInTableConfig(enabledPartitionTypes); return true; } @@ -1007,13 +1006,7 @@ protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String inst // finish off any pending compactions if any from previous attempt. writeClient.runAnyPendingCompactions(); - // TODO revert - HoodieTimeline timeline = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants(); - if (timeline.empty()) { - return; - } - - String latestDeltaCommitTime = timeline.lastInstant() + String latestDeltaCommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant() .get().getTimestamp(); List pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested() .findInstantsBefore(instantTime).getInstants().collect(Collectors.toList()); diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 3bf44de87c4b..3e20e44bfa96 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -274,15 +274,8 @@ private Map loadPartitionPathFiles() { private void doRefresh() { long startTime = System.currentTimeMillis(); - HoodieTableMetadata newTableMetadata; - - // TODO make configurable - if (partitionColumns.length > 0) { - newTableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePath.toString(), - FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()); - } else { - newTableMetadata = HoodieTableMetadata.createFSBackedTableMetadata(engineContext, metadataConfig, basePath.toString()); - } + HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePath.toString(), + FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue()); resetTableMetadata(newTableMetadata); From 6508ce91ea6e0300aae32bf321c915b6db42b52b Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Jul 2022 13:20:33 -0700 Subject: [PATCH 15/27] Fixing compilation (for Spark 2.4) --- .../apache/hudi/BaseHoodieTableFileIndex.java | 3 +- .../hudi/SparkHoodieTableFileIndex.scala | 35 ++++++++++--------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 3e20e44bfa96..8c5555e91528 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -40,7 +40,6 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.metadata.HoodieTableMetadata; -import org.apache.hudi.hadoop.CachingPath; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -403,7 +402,7 @@ Path fullPartitionPath(Path basePath) { return new CachingPath(basePath, createPathUnsafe(path)); } - return new CachingPath(basePath); + return basePath; } @Override diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index f95965460940..a9a38f5f82bd 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -20,7 +20,8 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} -import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, toJavaOption} +import org.apache.hudi.HoodieConversionUtils.toJavaOption +import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, shouldValidatePartitionColumns} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.bootstrap.index.BootstrapIndex import org.apache.hudi.common.config.TypedProperties @@ -41,6 +42,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters._ +import scala.language.implicitConversions /** * Implementation of the [[BaseHoodieTableFileIndex]] for Spark @@ -247,14 +249,14 @@ class SparkHoodieTableFileIndex(spark: SparkSession, // HIVE_STYLE_PARTITIONING is disable. // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02" val partitionWithName = - partitionFragments.zip(partitionColumns).map { - case (partition, columnName) => - if (partition.indexOf("=") == -1) { - s"${columnName}=$partition" - } else { - partition - } - }.mkString("/") + partitionFragments.zip(partitionColumns).map { + case (partition, columnName) => + if (partition.indexOf("=") == -1) { + s"${columnName}=$partition" + } else { + partition + } + }.mkString("/") val pathWithPartitionName = new CachingPath(basePath, createPathUnsafe(partitionWithName)) val partitionValues = parsePartitionPath(pathWithPartitionName, partitionSchema) @@ -274,20 +276,14 @@ class SparkHoodieTableFileIndex(spark: SparkSession, Set(basePath), partitionDataTypes, DateTimeUtils.getTimeZone(timeZoneId), - validatePartitionValues = spark.sessionState.conf.validatePartitionColumns + validatePartitionValues = shouldValidatePartitionColumns(spark) ) .toSeq(partitionSchema) } } -object SparkHoodieTableFileIndex { - implicit def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] = - if (opt.isDefined) { - org.apache.hudi.common.util.Option.of(opt.get) - } else { - org.apache.hudi.common.util.Option.empty() - } +object SparkHoodieTableFileIndex { /** * This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding @@ -344,4 +340,9 @@ object SparkHoodieTableFileIndex { override def invalidate(): Unit = cache.invalidateAll() } } + + private def shouldValidatePartitionColumns(spark: SparkSession): Boolean = { + // NOTE: We can't use helper, method nor the config-entry to stay compatible w/ Spark 2.4 + spark.sessionState.conf.getConfString("spark.sql.sources.validatePartitionColumns", "true").toBoolean + } } From 2ffe68389f2f94d164d29c08d4fb63727d8828fb Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Jul 2022 14:34:25 -0700 Subject: [PATCH 16/27] Make `CachingPath` non `Serializable`; Refactored `SerializablePath` to serialize `URI` instead (to avoid parsing, when deser); Tidying up --- .../main/java/org/apache/hudi/hadoop/CachingPath.java | 3 +-- .../java/org/apache/hudi/hadoop/SerializablePath.java | 9 +++++---- .../java/org/apache/hudi/metadata/BaseTableMetadata.java | 9 +++++---- .../org/apache/hudi/metadata/HoodieMetadataPayload.java | 2 +- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java index 077e3b2e50c1..2ff94f8628b8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java @@ -22,7 +22,6 @@ import org.apache.hudi.exception.HoodieException; import javax.annotation.concurrent.ThreadSafe; -import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; @@ -34,7 +33,7 @@ * NOTE: This class is thread-safe */ @ThreadSafe -public class CachingPath extends Path implements Serializable { +public class CachingPath extends Path { // NOTE: `volatile` keyword is redundant here and put mostly for reader notice, since all // reads/writes to references are always atomic (including 64-bit JVMs) diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java index 5ad2307ef804..796600a7e838 100644 --- a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java @@ -24,6 +24,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.net.URI; import java.util.Objects; /** @@ -42,12 +43,12 @@ public Path get() { } private void writeObject(ObjectOutputStream out) throws IOException { - out.writeUTF(path.toString()); + out.writeObject(path.toUri()); } - private void readObject(ObjectInputStream in) throws IOException { - String pathStr = in.readUTF(); - path = new CachingPath(pathStr); + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + URI uri = (URI) in.readObject(); + path = new CachingPath(uri); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 2a51b881d670..37a209b0a871 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hudi.hadoop.CachingPath; +import org.apache.hudi.hadoop.SerializablePath; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -73,7 +74,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata { protected final transient HoodieEngineContext engineContext; protected final SerializableConfiguration hadoopConf; - protected final Path dataBasePath; + protected final SerializablePath dataBasePath; protected final HoodieTableMetaClient dataMetaClient; protected final Option metrics; protected final HoodieMetadataConfig metadataConfig; @@ -88,7 +89,7 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataCon String dataBasePath, String spillableMapDirectory) { this.engineContext = engineContext; this.hadoopConf = new SerializableConfiguration(engineContext.getHadoopConf()); - this.dataBasePath = new CachingPath(dataBasePath); + this.dataBasePath = new SerializablePath(new CachingPath(dataBasePath)); this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(dataBasePath).build(); this.spillableMapDirectory = spillableMapDirectory; this.metadataConfig = metadataConfig; @@ -315,7 +316,7 @@ protected List fetchAllPartitionPaths() { * @param partitionPath The absolute path of the partition */ FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException { - String relativePartitionPath = FSUtils.getRelativePartitionPath(dataBasePath, partitionPath); + String relativePartitionPath = FSUtils.getRelativePartitionPath(dataBasePath.get(), partitionPath); String recordKey = relativePartitionPath.isEmpty() ? NON_PARTITIONED_NAME : relativePartitionPath; HoodieTimer timer = new HoodieTimer().startTimer(); @@ -343,7 +344,7 @@ Map fetchAllFilesInPartitionPaths(List partitionPath partitionPaths.parallelStream() .collect( Collectors.toMap(partitionPath -> { - String partitionId = FSUtils.getRelativePartitionPath(dataBasePath, partitionPath); + String partitionId = FSUtils.getRelativePartitionPath(dataBasePath.get(), partitionPath); return partitionId.isEmpty() ? NON_PARTITIONED_NAME : partitionId; }, Function.identity()) ); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index c7b9b3499181..057517780007 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -485,7 +485,7 @@ public FileStatus[] getFileStatuses(FileSystem fs, Path partitionPath) { long blockSize = fs.getDefaultBlockSize(partitionPath); return filterFileInfoEntries(false) .map(e -> { - // NOTE: Since we now that the Metadata Table's Payload is simply a file-name we're + // NOTE: Since we know that the Metadata Table's Payload is simply a file-name we're // creating Hadoop's Path using more performant unsafe variant CachingPath filePath = new CachingPath(partitionPath, createPathUnsafe(e.getKey())); return new FileStatus(e.getValue().getSize(), false, 0, blockSize, 0, 0, From de08ca35b75af26fd8ed1aa9e881520772ba51a1 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 27 Jul 2022 11:55:01 -0700 Subject: [PATCH 17/27] Fixed tests --- .../hudi/client/functional/TestConsistentBucketIndex.java | 2 +- .../org/apache/hudi/testutils/HoodieClientTestHarness.java | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java index e0bc22f70d23..05617301936e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java @@ -97,7 +97,7 @@ private void setUp(boolean populateMetaFields, boolean partitioned) throws Excep initTestDataGenerator(new String[] {""}); } initFileSystem(); - Properties props = populateMetaFields ? new Properties() : getPropertiesForKeyGen(); + Properties props = getPropertiesForKeyGen(populateMetaFields); props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key"); metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, props); config = getConfigBuilder() diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 80d185f62bc9..4d9569e00be8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -341,8 +341,12 @@ protected int incrementTimelineServicePortToUse() { } protected Properties getPropertiesForKeyGen() { + return getPropertiesForKeyGen(false); + } + + protected Properties getPropertiesForKeyGen(boolean populateMetaFields) { Properties properties = new Properties(); - properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false"); + properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), populateMetaFields); properties.put("hoodie.datasource.write.recordkey.field", "_row_key"); properties.put("hoodie.datasource.write.partitionpath.field", "partition_path"); properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key"); From f88229b003d1bfbe4be484f3dd9a9267471cb0d2 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 27 Jul 2022 12:20:08 -0700 Subject: [PATCH 18/27] Fixed invalid cast in `HoodieTableMetaClient` --- .../org/apache/hudi/common/table/HoodieTableMetaClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 529b0e8c99ed..5478c105a4a5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -398,7 +398,7 @@ public HoodieArchivedTimeline getArchivedTimeline(String startTs) { public void validateTableProperties(Properties properties) { // Once meta fields are disabled, it cant be re-enabled for a given table. if (!getTableConfig().populateMetaFields() - && Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()))) { + && Boolean.parseBoolean((String) properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(), HoodieTableConfig.POPULATE_META_FIELDS.defaultValue().toString()))) { throw new HoodieException(HoodieTableConfig.POPULATE_META_FIELDS.key() + " already disabled for the table. Can't be re-enabled back"); } From 068a1105340d31fff80d71a9c0ff1530659b9d15 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 27 Jul 2022 12:48:08 -0700 Subject: [PATCH 19/27] Fixing tests w/ invalid configuration --- .../table/TestHoodieMergeOnReadTable.java | 26 ++++++++++++++++--- ...HoodieSparkMergeOnReadTableCompaction.java | 19 +++++++++++--- ...eSparkMergeOnReadTableIncrementalRead.java | 2 +- ...stHoodieSparkMergeOnReadTableRollback.java | 9 ++++--- .../testutils/HoodieClientTestHarness.java | 2 +- .../SparkClientFunctionalTestHarness.java | 12 ++++++--- .../hudi/common/util/CollectionUtils.java | 9 +++++++ .../common/testutils/HoodieTestUtils.java | 11 ++++---- .../sources/TestHoodieIncrSource.java | 4 +-- 9 files changed, 70 insertions(+), 24 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 0ce6ca0ee923..0b80d20b39c3 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -36,6 +36,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.Transformations; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -61,7 +62,6 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.storage.StorageLevel; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -87,9 +87,8 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness private HoodieTableMetaClient metaClient; private HoodieTestDataGenerator dataGen; - @BeforeEach - void setUp() throws IOException { - Properties properties = new Properties(); + void setUp(Properties props) throws IOException { + Properties properties = CollectionUtils.copy(props); properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); dataGen = new HoodieTestDataGenerator(); @@ -99,6 +98,9 @@ void setUp() throws IOException { @Test public void testMetadataAggregateFromWriteStatus() throws Exception { HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build(); + + setUp(cfg.getProps()); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { String newCommitTime = "001"; @@ -125,6 +127,9 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig cfg = cfgBuilder.build(); + + setUp(cfg.getProps()); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { /** @@ -213,6 +218,8 @@ public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig config = cfgBuilder.build(); + setUp(config.getProps()); + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -302,6 +309,8 @@ public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Excep HoodieWriteConfig cfg = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY) .withAutoCommit(false).build(); + setUp(cfg.getProps()); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient); @@ -381,6 +390,9 @@ public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Excep @Test public void testRollingStatsWithSmallFileHandling() throws Exception { HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); + + setUp(cfg.getProps()); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { Map fileIdToInsertsMap = new HashMap<>(); Map fileIdToUpsertsMap = new HashMap<>(); @@ -497,6 +509,9 @@ public void testRollingStatsWithSmallFileHandling() throws Exception { @Test public void testHandleUpdateWithMultiplePartitions() throws Exception { HoodieWriteConfig cfg = getConfig(true); + + setUp(cfg.getProps()); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { /** @@ -578,6 +593,9 @@ public void testReleaseResource() throws Exception { HoodieWriteConfig.Builder builder = getConfigBuilder(true); builder.withReleaseResourceEnabled(true); builder.withAutoCommit(false); + + setUp(builder.build().getProps()); + /** * Write 1 (test when RELEASE_RESOURCE_ENABLE is true) */ diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java index 3b30c5b76736..f959a8f0d952 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java @@ -54,6 +54,7 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.List; +import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -98,8 +99,13 @@ public void testWriteDuringCompaction() throws IOException { .withLayoutConfig(HoodieLayoutConfig.newBuilder() .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()) .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build(); - metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps()); + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()) + .build(); + + Properties props = getPropertiesForKeyGen(true); + props.putAll(config.getProps()); + + metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props); client = getHoodieWriteClient(config); // write data and commit @@ -138,8 +144,13 @@ public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean en .withLayoutConfig(HoodieLayoutConfig.newBuilder() .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name()) .withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build()) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build(); - metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps()); + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()) + .build(); + + Properties props = getPropertiesForKeyGen(true); + props.putAll(config.getProps()); + + metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props); client = getHoodieWriteClient(config); final List records = dataGen.generateInserts("001", 100); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java index 5df7b4daecc7..275fd32ca7d8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java @@ -82,7 +82,7 @@ void setUp() { public void testIncrementalReadsWithCompaction() throws Exception { final String partitionPath = "2020/02/20"; // use only one partition for this test final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] { partitionPath }); - Properties props = new Properties(); + Properties props = getPropertiesForKeyGen(true); props.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.PARQUET.toString()); HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props); HoodieWriteConfig cfg = getConfigBuilder(true).build(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 2f0e585ec90a..0a11425ec5b8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -41,6 +41,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCleanConfig; @@ -155,7 +156,7 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro addConfigsForPopulateMetaFields(cfgBuilder, true); HoodieWriteConfig cfg = cfgBuilder.build(); - Properties properties = new Properties(); + Properties properties = CollectionUtils.copy(cfg.getProps()); properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); @@ -327,7 +328,7 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig cfg = cfgBuilder.build(); - Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen(); + Properties properties = getPropertiesForKeyGen(populateMetaFields); properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); @@ -606,8 +607,10 @@ void testMORTableRestore(boolean restoreAfterCompaction) throws Exception { .withMarkersType(MarkerType.DIRECT.name()); HoodieWriteConfig cfg = cfgBuilder.build(); - Properties properties = new Properties(); + Properties properties = getPropertiesForKeyGen(true); + properties.putAll(cfg.getProps()); properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 4d9569e00be8..468be5fb2640 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -346,7 +346,7 @@ protected Properties getPropertiesForKeyGen() { protected Properties getPropertiesForKeyGen(boolean populateMetaFields) { Properties properties = new Properties(); - properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), populateMetaFields); + properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields)); properties.put("hoodie.datasource.write.recordkey.field", "_row_key"); properties.put("hoodie.datasource.write.partitionpath.field", "partition_path"); properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index c58dd178dca3..ba1afbebb252 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -160,7 +160,7 @@ public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, Strin } public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath) throws IOException { - return getHoodieMetaClient(hadoopConf, basePath, new Properties()); + return getHoodieMetaClient(hadoopConf, basePath, getPropertiesForKeyGen(true)); } @Override @@ -310,8 +310,12 @@ protected FileStatus[] listAllBaseFilesInPath(HoodieTable table) throws IOExcept } protected Properties getPropertiesForKeyGen() { + return getPropertiesForKeyGen(false); + } + + protected Properties getPropertiesForKeyGen(boolean populateMetaFields) { Properties properties = new Properties(); - properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false"); + properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields)); properties.put("hoodie.datasource.write.recordkey.field", "_row_key"); properties.put("hoodie.datasource.write.partitionpath.field", "partition_path"); properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key"); @@ -321,9 +325,9 @@ protected Properties getPropertiesForKeyGen() { } protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) { + configBuilder.withProperties(getPropertiesForKeyGen(populateMetaFields)); if (!populateMetaFields) { - configBuilder.withProperties(getPropertiesForKeyGen()) - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build()); + configBuilder.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java index 8036995fab56..194d67cd0e3e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java @@ -54,6 +54,15 @@ public static boolean nonEmpty(Collection c) { return !isNullOrEmpty(c); } + /** + * Makes a copy of provided {@link Properties} object + */ + public static Properties copy(Properties props) { + Properties copy = new Properties(); + copy.putAll(props); + return copy; + } + /** * Returns last element of the array of {@code T} */ diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index 6a2bffd34d6e..945796a2bdf6 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -127,11 +127,12 @@ public static HoodieTableMetaClient init(Configuration hadoopConf, String basePa Properties properties) throws IOException { properties = HoodieTableMetaClient.withPropertyBuilder() - .setTableName(RAW_TRIPS_TEST_NAME) - .setTableType(tableType) - .setPayloadClass(HoodieAvroPayload.class) - .fromProperties(properties) - .build(); + .setTableName(RAW_TRIPS_TEST_NAME) + .setTableType(tableType) + .setPayloadClass(HoodieAvroPayload.class) + .setPartitionFields("some_nonexistent_field") + .fromProperties(properties) + .build(); return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index 57270bdf812d..0d090e1c4581 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -114,9 +114,9 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe if (expectedCount == 0) { assertFalse(batchCheckPoint.getKey().isPresent()); } else { - assertEquals(batchCheckPoint.getKey().get().count(), expectedCount); + assertEquals(expectedCount, batchCheckPoint.getKey().get().count()); } - Assertions.assertEquals(batchCheckPoint.getRight(), expectedCheckpoint); + Assertions.assertEquals(expectedCheckpoint, batchCheckPoint.getRight()); } private Pair> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List insertRecords, String commit) throws IOException { From 3db948f5568e7d55a89dce98068a9acd6c7d4893 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 27 Jul 2022 15:03:27 -0700 Subject: [PATCH 20/27] Fixing more tests w/ invalid configuration --- .../common/testutils/HoodieTestUtils.java | 36 +++++++++---------- .../hadoop/testutils/InputFormatTestUtil.java | 5 ++- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index 945796a2bdf6..d3c1de56773b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -40,6 +40,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.Properties; import java.util.UUID; @@ -124,29 +125,28 @@ public static HoodieTableMetaClient init(Configuration hadoopConf, String basePa } public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, - Properties properties) - throws IOException { - properties = HoodieTableMetaClient.withPropertyBuilder() - .setTableName(RAW_TRIPS_TEST_NAME) - .setTableType(tableType) - .setPayloadClass(HoodieAvroPayload.class) - .setPartitionFields("some_nonexistent_field") - .fromProperties(properties) - .build(); - return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties); + Properties properties) throws IOException { + return init(hadoopConf, basePath, tableType, properties, null); } public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, Properties properties, String databaseName) throws IOException { - properties = HoodieTableMetaClient.withPropertyBuilder() - .setDatabaseName(databaseName) - .setTableName(RAW_TRIPS_TEST_NAME) - .setTableType(tableType) - .setPayloadClass(HoodieAvroPayload.class) - .fromProperties(properties) - .build(); - return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties); + HoodieTableMetaClient.PropertyBuilder builder = + HoodieTableMetaClient.withPropertyBuilder() + .setDatabaseName(databaseName) + .setTableName(RAW_TRIPS_TEST_NAME) + .setTableType(tableType) + .setPayloadClass(HoodieAvroPayload.class); + + String keyGen = properties.getProperty("hoodie.datasource.write.keygenerator.class"); + if (!Objects.equals(keyGen, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")) { + builder.setPartitionFields("some_nonexistent_field"); + } + + Properties processedProperties = builder.fromProperties(properties).build(); + + return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, processedProperties); } public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, HoodieFileFormat baseFileFormat) throws IOException { diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index db8002cd2d46..a4471845c3e4 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -86,7 +86,10 @@ public static File prepareCustomizedTable(java.nio.file.Path basePath, HoodieFil baseFileFormat); } - java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01")); + java.nio.file.Path partitionPath = useNonPartitionedKeyGen + ? basePath + : basePath.resolve(Paths.get("2016", "05", "01")); + setupPartition(basePath, partitionPath); if (injectData) { From 56d51765afd92e09d2c2bf16bf873edb3d5f2796 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 27 Jul 2022 19:04:15 -0700 Subject: [PATCH 21/27] Fixed configs in IT to properly mirror configs for DS cases --- docker/demo/config/dfs-source.properties | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docker/demo/config/dfs-source.properties b/docker/demo/config/dfs-source.properties index ac7080e1412b..a90629ef8e67 100644 --- a/docker/demo/config/dfs-source.properties +++ b/docker/demo/config/dfs-source.properties @@ -19,6 +19,10 @@ include=base.properties # Key fields, for kafka example hoodie.datasource.write.recordkey.field=key hoodie.datasource.write.partitionpath.field=date +# NOTE: We have to duplicate configuration since this is being used +# w/ both Spark and DeltaStreamer +hoodie.table.recordkey.fields=key +hoodie.table.partition.fields=date # Schema provider props (change to absolute path based on your installation) hoodie.deltastreamer.schemaprovider.source.schema.file=/var/demo/config/schema.avsc hoodie.deltastreamer.schemaprovider.target.schema.file=/var/demo/config/schema.avsc From 409066feea3265decac3d1010e1e4a1ddb5f05f4 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 27 Jul 2022 19:04:41 -0700 Subject: [PATCH 22/27] Tidying up --- .../java/org/apache/hudi/BaseHoodieTableFileIndex.java | 6 +----- .../main/java/org/apache/hudi/hadoop/CachingPath.java | 9 +++++++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index 8c5555e91528..ada5df3a8a95 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -67,13 +67,12 @@ *
  • Query instant/range
  • * */ -public abstract class BaseHoodieTableFileIndex implements AutoCloseable { +public abstract class BaseHoodieTableFileIndex implements AutoCloseable { private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class); private final String[] partitionColumns; - private final FileSystemViewStorageConfig fileSystemStorageConfig; protected final HoodieMetadataConfig metadataConfig; private final HoodieTableQueryType queryType; @@ -123,9 +122,6 @@ public BaseHoodieTableFileIndex(HoodieEngineContext engineContext, this.partitionColumns = metaClient.getTableConfig().getPartitionFields() .orElse(new String[0]); - this.fileSystemStorageConfig = FileSystemViewStorageConfig.newBuilder() - .fromProperties(configProperties) - .build(); this.metadataConfig = HoodieMetadataConfig.newBuilder() .fromProperties(configProperties) .build(); diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java index 2ff94f8628b8..d6e35dbbdc5a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java @@ -110,7 +110,11 @@ public static CachingPath wrap(Path path) { } /** - * TODO elaborate + * Creates path based on the provided *relative* path + * + * NOTE: This is an unsafe version that is relying on the fact that the caller is aware + * what they are doing this is not going to work with paths having scheme (which require + * parsing) and is only meant to work w/ relative paths in a few specific cases. */ public static CachingPath createPathUnsafe(String relativePath) { try { @@ -124,7 +128,8 @@ public static CachingPath createPathUnsafe(String relativePath) { } /** - * TODO elaborate + * This is {@link Path#getPathWithoutSchemeAndAuthority(Path)} counterpart, instantiating + * {@link CachingPath} */ public static Path getPathWithoutSchemeAndAuthority(Path path) { // This code depends on Path.toString() to remove the leading slash before From 95ce817e050387177ba9620d33868eae1d04306c Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 28 Jul 2022 10:15:38 -0700 Subject: [PATCH 23/27] Carve out exception for MT to avoid treating it as non-partitioned one --- .../org/apache/hudi/BaseHoodieTableFileIndex.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index ada5df3a8a95..5910de5f1d39 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -155,7 +155,7 @@ public Option getLatestCompletedInstant() { * Returns table's base-path */ public String getBasePath() { - return metaClient.getBasePath(); + return basePath.toString(); } /** @@ -341,11 +341,7 @@ private Map getAllFilesInPartitionsUnchecked(Collection getAllPartitionPathsUnchecked() { try { - if (partitionColumns.length == 0) { - return Collections.singletonList(""); - } - - return tableMetadata.getAllPartitionPaths(); + return isPartitionedTable() ? tableMetadata.getAllPartitionPaths() : Collections.singletonList(""); } catch (IOException e) { throw new HoodieIOException("Failed to fetch partition paths for a table", e); } @@ -378,7 +374,12 @@ private void resetTableMetadata(HoodieTableMetadata newTableMetadata) { tableMetadata = newTableMetadata; } + private boolean isPartitionedTable() { + return partitionColumns.length > 0 || HoodieTableMetadata.isMetadataTable(basePath.toString()); + } + public static final class PartitionPath { + final String path; final Object[] values; From d448c7496756570220e454059631c902501886f4 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 2 Sep 2022 20:02:28 -0700 Subject: [PATCH 24/27] Fixed `SimpleKeyGenerator` to assert that configured primary-key and partition-path fields are non-empty --- .../java/org/apache/hudi/keygen/BuiltinKeyGenerator.java | 1 - .../java/org/apache/hudi/keygen/SimpleKeyGenerator.java | 7 +++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java index d7561cc12659..ad71b17ce70f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java @@ -77,7 +77,6 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER); protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER); - protected transient volatile SparkRowConverter rowConverter; protected transient volatile SparkRowAccessor rowAccessor; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java index dcffdf3cdb83..8c43e19baaf5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java @@ -20,6 +20,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; @@ -46,6 +47,12 @@ public SimpleKeyGenerator(TypedProperties props) { SimpleKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) { super(props); + // Make sure key-generator is configured properly + ValidationUtils.checkArgument(recordKeyField == null || !recordKeyField.isEmpty(), + "Record key field has to be non-empty!"); + ValidationUtils.checkArgument(partitionPathField == null || !partitionPathField.isEmpty(), + "Partition path field has to be non-empty!"); + this.recordKeyFields = recordKeyField == null ? Collections.emptyList() : Collections.singletonList(recordKeyField); this.partitionPathFields = partitionPathField == null ? Collections.emptyList() : Collections.singletonList(partitionPathField); this.simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField); From ea2c947722271521c860ee1244586654b20cead0 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 2 Sep 2022 20:04:16 -0700 Subject: [PATCH 25/27] Fixed improperly configured test --- .../scala/org/apache/hudi/functional/TestTimeTravelQuery.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala index 890f8a9019a3..18cef35c3219 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala @@ -238,6 +238,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase { .options(commonOpts) .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) .option(PARTITIONPATH_FIELD.key, "") + .option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName) .mode(SaveMode.Overwrite) .save(basePath) @@ -253,6 +254,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase { .options(commonOpts) .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) .option(PARTITIONPATH_FIELD.key, "") + .option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName) .mode(SaveMode.Append) .save(basePath) metaClient.reloadActiveTimeline() @@ -264,6 +266,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase { .options(commonOpts) .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) .option(PARTITIONPATH_FIELD.key, "") + .option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName) .mode(SaveMode.Append) .save(basePath) metaClient.reloadActiveTimeline() From 8be611a24503d2bd26a924b815b0b92aac4e787a Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 2 Sep 2022 21:55:04 -0700 Subject: [PATCH 26/27] Fixed invalid partition-path configs in `TestHoodieDeltaStreamer` suite --- .../utilities/functional/TestHoodieDeltaStreamer.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 69d6dd7d3b29..86bf170fe84f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -1646,7 +1646,7 @@ private void prepareJsonKafkaDFSSource(String propsFileName, String autoResetVal props.setProperty("include", "base.properties"); props.setProperty("hoodie.embed.timeline.server", "false"); props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); - props.setProperty("hoodie.datasource.write.partitionpath.field", ""); + props.setProperty("hoodie.datasource.write.partitionpath.field", "driver"); props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT); props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName); props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type", kafkaCheckpointType); @@ -1670,7 +1670,7 @@ private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean autoRes prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA); prepareParquetDFSSource(true, false, "source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET, - PARQUET_SOURCE_ROOT, false, ""); + PARQUET_SOURCE_ROOT, false, "driver"); // delta streamer w/ parquet source String tableBasePath = dfsBasePath + "/test_dfs_to_kafka" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( @@ -1797,8 +1797,8 @@ public void testORCDFSSourceWithSchemaProviderAndWithTransformer() throws Except private void prepareCsvDFSSource( boolean hasHeader, char sep, boolean useSchemaProvider, boolean hasTransformer) throws IOException { String sourceRoot = dfsBasePath + "/csvFiles"; - String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : "_c0"; - String partitionPath = (hasHeader || useSchemaProvider) ? "partition_path" : ""; + String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : "_c1"; + String partitionPath = (hasHeader || useSchemaProvider) ? "partition_path" : "_c2"; // Properties used for testing delta-streamer with CSV source TypedProperties csvProps = new TypedProperties(); @@ -2070,7 +2070,7 @@ public void testInsertOverwriteTable() throws Exception { @Test public void testDeletePartitions() throws Exception { prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", - PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, ""); + PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path"); String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum; HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(), From 46e53b5182ffdf6fa43b5a93921222e869e4e535 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 6 Sep 2022 16:22:56 -0700 Subject: [PATCH 27/27] Rebased `TimeTravelQuery` test to use Simple KG instead --- .../org/apache/hudi/functional/TestTimeTravelQuery.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala index 18cef35c3219..5a71f0e37136 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala @@ -237,8 +237,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase { df1.write.format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(PARTITIONPATH_FIELD.key, "") - .option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName) + .option(PARTITIONPATH_FIELD.key, "name") .mode(SaveMode.Overwrite) .save(basePath) @@ -253,8 +252,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase { df2.write.format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(PARTITIONPATH_FIELD.key, "") - .option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName) + .option(PARTITIONPATH_FIELD.key, "name") .mode(SaveMode.Append) .save(basePath) metaClient.reloadActiveTimeline() @@ -265,8 +263,7 @@ class TestTimeTravelQuery extends HoodieClientTestBase { df3.write.format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name()) - .option(PARTITIONPATH_FIELD.key, "") - .option(KEYGENERATOR_CLASS_NAME.key, classOf[NonpartitionedKeyGenerator].getName) + .option(PARTITIONPATH_FIELD.key, "name") .mode(SaveMode.Append) .save(basePath) metaClient.reloadActiveTimeline()