From b181063ddd13583ee2c6fa862bb476f3a7cbf1ab Mon Sep 17 00:00:00 2001 From: bhat-vinay <152183592+bhat-vinay@users.noreply.github.com> Date: Mon, 11 Dec 2023 22:08:30 +0530 Subject: [PATCH] [HUDI-7040] Handle dropping of partition columns in BulkInsertDataInternalWriterHelper::write(...) (#10272) Issue: There are two configs which when set in a certain manner throws exceptions or asserts 1. Configs to disable populating metadata fields (for each row) 2. Configs to drop partition columns (to save storage space) from a row With #1 and #2, partition paths cannot be deduced using partition columns (as the partition columns are dropped higher up the stack. BulkInsertDataInternalWriterHelper::write(...) relied on metadata fields to extract partition path in such cases. But with #1 it is not possible resulting in asserts/exceptions. The fix is to push down the dropping of partition columns down the stack after partition path is computed. The fix manipulates the raw 'InternalRow' row structure by only copying the relevent fields into a new 'InternalRow' structure. Each row is processed individually to drop the partition columns and copy it a to new 'InternalRow' Co-authored-by: Vinaykumar Bhat --- .../apache/hudi/config/HoodieWriteConfig.java | 4 ++ .../BulkInsertDataInternalWriterHelper.java | 34 ++++++++++++- .../hudi/HoodieDatasetBulkInsertHelper.scala | 31 ++++-------- ...DatasetBulkInsertCommitActionExecutor.java | 3 +- .../TestHoodieDatasetBulkInsertHelper.java | 12 ++--- .../hudi/TestHoodieSparkSqlWriter.scala | 48 ++++++++++++++++++- 6 files changed, 101 insertions(+), 31 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 51895751101c..7a2f2427af81 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1390,6 +1390,10 @@ public boolean shouldAllowMultiWriteOnSameInstant() { return getBoolean(ALLOW_MULTI_WRITE_ON_SAME_INSTANT_ENABLE); } + public boolean shouldDropPartitionColumns() { + return getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS); + } + public String getWriteStatusClassName() { return getString(WRITE_STATUS_CLASS_NAME); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java index 7f6054b22966..0773e8a5a0ae 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.commit; +import org.apache.hudi.HoodieDatasetBulkInsertHelper; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; @@ -38,11 +39,16 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.UUID; +import scala.collection.JavaConversions; +import scala.collection.JavaConverters; + /** * Helper class for HoodieBulkInsertDataInternalWriter used by Spark datasource v2. */ @@ -124,7 +130,33 @@ public void write(InternalRow row) throws IOException { lastKnownPartitionPath = partitionPath.clone(); } - handle.write(row); + boolean shouldDropPartitionColumns = writeConfig.shouldDropPartitionColumns(); + if (shouldDropPartitionColumns) { + // Drop the partition columns from the row + // Using the deprecated JavaConversions to be compatible with scala versions < 2.12. Once hudi support for scala versions < 2.12 is + // stopped, can move this to JavaConverters.seqAsJavaList(...) + List partitionCols = JavaConversions.seqAsJavaList(HoodieDatasetBulkInsertHelper.getPartitionPathCols(this.writeConfig)); + Set partitionIdx = new HashSet(); + for (String col : partitionCols) { + partitionIdx.add(this.structType.fieldIndex(col)); + } + + // Relies on InternalRow::toSeq(...) preserving the column ordering based on the supplied schema + // Using the deprecated JavaConversions to be compatible with scala versions < 2.12. + List cols = JavaConversions.seqAsJavaList(row.toSeq(structType)); + int idx = 0; + List newCols = new ArrayList(); + for (Object o : cols) { + if (!partitionIdx.contains(idx)) { + newCols.add(o); + } + idx += 1; + } + InternalRow newRow = InternalRow.fromSeq(JavaConverters.asScalaIteratorConverter(newCols.iterator()).asScala().toSeq()); + handle.write(newRow); + } else { + handle.write(row); + } } catch (Throwable t) { LOG.error("Global error thrown while trying to write records in HoodieRowCreateHandle ", t); throw t; diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index 12e446d7be6e..75ec069946d2 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -62,7 +62,6 @@ object HoodieDatasetBulkInsertHelper def prepareForBulkInsert(df: DataFrame, config: HoodieWriteConfig, partitioner: BulkInsertPartitioner[Dataset[Row]], - shouldDropPartitionColumns: Boolean, instantTime: String): Dataset[Row] = { val populateMetaFields = config.populateMetaFields() val schema = df.schema @@ -128,16 +127,10 @@ object HoodieDatasetBulkInsertHelper HoodieUnsafeUtils.createDataFrameFrom(df.sparkSession, prependedQuery) } - val trimmedDF = if (shouldDropPartitionColumns) { - dropPartitionColumns(updatedDF, config) - } else { - updatedDF - } - val targetParallelism = - deduceShuffleParallelism(trimmedDF, config.getBulkInsertShuffleParallelism) + deduceShuffleParallelism(updatedDF, config.getBulkInsertShuffleParallelism) - partitioner.repartitionRecords(trimmedDF, targetParallelism) + partitioner.repartitionRecords(updatedDF, targetParallelism) } /** @@ -243,21 +236,17 @@ object HoodieDatasetBulkInsertHelper } } - private def dropPartitionColumns(df: DataFrame, config: HoodieWriteConfig): DataFrame = { - val partitionPathFields = getPartitionPathFields(config).toSet - val nestedPartitionPathFields = partitionPathFields.filter(f => f.contains('.')) - if (nestedPartitionPathFields.nonEmpty) { - logWarning(s"Can not drop nested partition path fields: $nestedPartitionPathFields") - } - - val partitionPathCols = (partitionPathFields -- nestedPartitionPathFields).toSeq - - df.drop(partitionPathCols: _*) - } - private def getPartitionPathFields(config: HoodieWriteConfig): Seq[String] = { val keyGeneratorClassName = config.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME) val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator] keyGenerator.getPartitionPathFields.asScala } + + def getPartitionPathCols(config: HoodieWriteConfig): Seq[String] = { + val partitionPathFields = getPartitionPathFields(config).toSet + val nestedPartitionPathFields = partitionPathFields.filter(f => f.contains('.')) + + return (partitionPathFields -- nestedPartitionPathFields).toSeq + } + } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java index fb0218137d20..1e20e4ab663d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java @@ -95,8 +95,7 @@ public final HoodieWriteResult execute(Dataset records, boolean isTablePart table = writeClient.initTable(getWriteOperationType(), Option.ofNullable(instantTime)); BulkInsertPartitioner> bulkInsertPartitionerRows = getPartitioner(populateMetaFields, isTablePartitioned); - boolean shouldDropPartitionColumns = writeConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS()); - Dataset hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(records, writeConfig, bulkInsertPartitionerRows, shouldDropPartitionColumns, instantTime); + Dataset hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(records, writeConfig, bulkInsertPartitionerRows, instantTime); preExecute(); HoodieWriteMetadata> result = buildHoodieWriteMetadata(doExecute(hoodieDF, bulkInsertPartitionerRows.arePartitionRecordsSorted())); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java index 52c456869783..50ec641c182f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java @@ -128,7 +128,7 @@ private void testBulkInsertHelperFor(String keyGenClass, String recordKeyField) List rows = DataSourceTestUtils.generateRandomRows(10); Dataset dataset = sqlContext.createDataFrame(rows, structType); Dataset result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "0000000001"); + new NonSortPartitionerWithRows(), "0000000001"); StructType resultSchema = result.schema(); assertEquals(result.count(), 10); @@ -172,7 +172,7 @@ public void testBulkInsertHelperNoMetaFields() { .build(); Dataset dataset = sqlContext.createDataFrame(rows, structType); Dataset result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "000001111"); + new NonSortPartitionerWithRows(), "000001111"); StructType resultSchema = result.schema(); assertEquals(result.count(), 10); @@ -209,7 +209,7 @@ public void testBulkInsertPreCombine(boolean enablePreCombine) { rows.addAll(updates); Dataset dataset = sqlContext.createDataFrame(rows, structType); Dataset result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "000001111"); + new NonSortPartitionerWithRows(), "000001111"); StructType resultSchema = result.schema(); assertEquals(result.count(), enablePreCombine ? 10 : 15); @@ -313,7 +313,7 @@ public void testNoPropsSet() { Dataset dataset = sqlContext.createDataFrame(rows, structType); try { Dataset preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "000001111"); + new NonSortPartitionerWithRows(), "000001111"); preparedDF.count(); fail("Should have thrown exception"); } catch (Exception e) { @@ -325,7 +325,7 @@ public void testNoPropsSet() { dataset = sqlContext.createDataFrame(rows, structType); try { Dataset preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "000001111"); + new NonSortPartitionerWithRows(), "000001111"); preparedDF.count(); fail("Should have thrown exception"); } catch (Exception e) { @@ -337,7 +337,7 @@ public void testNoPropsSet() { dataset = sqlContext.createDataFrame(rows, structType); try { Dataset preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, - new NonSortPartitionerWithRows(), false, "000001111"); + new NonSortPartitionerWithRows(), "000001111"); preparedDF.count(); fail("Should have thrown exception"); } catch (Exception e) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 865ca147eb05..38221cc05c7e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.functions.{expr, lit} import org.apache.spark.sql.hudi.HoodieSparkSessionExtension import org.apache.spark.sql.hudi.command.SqlKeyGenerator -import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertNull, assertTrue, fail} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments.arguments @@ -365,6 +365,52 @@ class TestHoodieSparkSqlWriter { testBulkInsertWithSortMode(BulkInsertSortMode.NONE, populateMetaFields) } +@Test +def testBulkInsertForDropPartitionColumn(): Unit = { + //create a new table + val tableName = "trips_table" + val basePath = "file:///tmp/trips_table" + val columns = Seq("ts", "uuid", "rider", "driver", "fare", "city") + val data = + Seq((1695159649087L, "334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", "driver-K", 19.10, "san_francisco"), + (1695091554788L, "e96c4396-3fad-413a-a942-4cb36106d721", "rider-C", "driver-M", 27.70, "san_francisco"), + (1695046462179L, "9909a8b1-2d15-4d3d-8ec9-efc48c536a00", "rider-D", "driver-L", 33.90, "san_francisco"), + (1695516137016L, "e3cf430c-889d-4015-bc98-59bdce1e530c", "rider-F", "driver-P", 34.15, "sao_paulo"), + (1695115999911L, "c8abbe79-8d89-47ea-b4ce-4d224bae5bfa", "rider-J", "driver-T", 17.85, "chennai")); + + var inserts = spark.createDataFrame(data).toDF(columns: _*) + inserts.write.format("hudi"). + option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "city"). + option(HoodieWriteConfig.TABLE_NAME, tableName). + option("hoodie.datasource.write.recordkey.field", "uuid"). + option("hoodie.datasource.write.precombine.field", "rider"). + option("hoodie.datasource.write.operation", "bulk_insert"). + option("hoodie.datasource.write.hive_style_partitioning", "true"). + option("hoodie.populate.meta.fields", "false"). + option("hoodie.datasource.write.drop.partition.columns", "true"). + mode(SaveMode.Overwrite). + save(basePath) + + // Ensure the partition column (i.e 'city') can be read back + val tripsDF = spark.read.format("hudi").load(basePath) + tripsDF.show() + tripsDF.select("city").foreach(row => { + assertNotNull(row) + }) + + // Peek into the raw parquet file and ensure partition column is not written to the file + val partitions = Seq("city=san_francisco", "city=chennai", "city=sao_paulo") + val partitionPaths = new Array[String](3) + for (i <- partitionPaths.indices) { + partitionPaths(i) = String.format("%s/%s/*", basePath, partitions(i)) + } + val rawFileDf = spark.sqlContext.read.parquet(partitionPaths(0), partitionPaths(1), partitionPaths(2)) + rawFileDf.show() + rawFileDf.select("city").foreach(row => { + assertNull(row.get(0)) + }) +} + /** * Test case for disable and enable meta fields. */