diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 611d937b8c69..a4e2dc4055ae 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.AvroRuntimeException; @@ -262,4 +263,30 @@ public String migratePartitionMeta( HoodieTableHeaderFields.HEADER_ACTION }, rows); } + + @CliCommand(value = "repair deprecated partition", + help = "Repair deprecated partition (\"default\"). Re-writes data from the deprecated partition into " + PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH) + public String repairDeprecatePartition( + @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path", + unspecifiedDefaultValue = "") String sparkPropertiesPath, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", + help = "Spark executor memory") final String sparkMemory) throws Exception { + if (StringUtils.isNullOrEmpty(sparkPropertiesPath)) { + sparkPropertiesPath = + Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala()); + } + + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkMain.SparkCommand.REPAIR_DEPRECATED_PARTITION.toString(), master, sparkMemory, + HoodieCLI.getTableMetaClient().getBasePathV2().toString()); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + + if (exitCode != 0) { + return "Deduplication failed!"; + } + return "Repair succeeded"; + } } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index ef17abca5793..e293f25d0b71 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -28,10 +28,13 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieBootstrapConfig; @@ -59,13 +62,18 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.functions; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.Map; import static org.apache.hudi.utilities.UtilHelpers.EXECUTE; import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE; @@ -86,7 +94,7 @@ public class SparkMain { enum SparkCommand { BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE, COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE, - CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE + CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE, REPAIR_DEPRECATED_PARTITION } public static void main(String[] args) throws Exception { @@ -270,6 +278,10 @@ public static void main(String[] args) throws Exception { assert (args.length == 5); returnCode = upgradeOrDowngradeTable(jsc, args[3], args[4]); break; + case REPAIR_DEPRECATED_PARTITION: + assert (args.length == 4); + returnCode = repairDeprecatedPartition(jsc, args[3]); + break; default: break; } @@ -414,6 +426,37 @@ private static int deduplicatePartitionPath(JavaSparkContext jsc, String duplica return 0; } + public static int repairDeprecatedPartition(JavaSparkContext jsc, String basePath) { + SQLContext sqlContext = new SQLContext(jsc); + Dataset recordsToRewrite = sqlContext.read().option("hoodie.datasource.read.extract.partition.values.from.path","false").format("hudi").load(basePath) + .filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH + "'") + .drop(HoodieRecord.RECORD_KEY_METADATA_FIELD).drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + .drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD).drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD); + + if (!recordsToRewrite.isEmpty()) { + recordsToRewrite.cache(); + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build(); + + Map propsMap = new HashMap<>(); + metaClient.getTableConfig().getProps().forEach((k, v) -> propsMap.put(k.toString(), v.toString())); + propsMap.put(HoodieWriteConfig.SKIP_DEFAULT_PARTITION_VALIDATION.key(), "true"); + propsMap.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), metaClient.getTableConfig().getRecordKeyFieldProp()); + propsMap.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), metaClient.getTableConfig().getPartitionFieldProp()); + propsMap.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), metaClient.getTableConfig().getKeyGeneratorClassName()); + + recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), + functions.lit(PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH)).write().options(propsMap) + .option("hoodie.datasource.write.operation","insert").format("hudi").mode("Append").save(basePath); + + // after re-writing, we can safely delete older data. + propsMap.put("hoodie.datasource.write.partitions.to.delete", PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH); + recordsToRewrite.write().options(propsMap).option("hoodie.datasource.write.operation", WriteOperationType.DELETE_PARTITION.value()).format("hudi") + .mode("Append").save(basePath); + } + return 0; + } + private static int doBootstrap(JavaSparkContext jsc, String tableName, String tableType, String basePath, String sourcePath, String recordKeyCols, String partitionFields, String parallelism, String schemaProviderClass, String bootstrapIndexClass, String selectorClass, String keyGenerator, String fullBootstrapInputProvider, diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java index 96e0873da509..3f35c9d96aa1 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java @@ -23,16 +23,29 @@ import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.functional.CLIFunctionalTestHarness; import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.RawTripTestPayload; +import org.apache.hudi.common.util.PartitionPathEncodeUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.testutils.Assertions; +import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.SQLContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -44,6 +57,7 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -59,6 +73,7 @@ import static org.apache.hudi.common.table.HoodieTableConfig.VERSION; import static org.apache.hudi.common.table.HoodieTableConfig.generateChecksum; import static org.apache.hudi.common.table.HoodieTableConfig.validateChecksum; +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -235,4 +250,73 @@ public void testRemoveCorruptedPendingCleanAction() throws IOException { metaClient = HoodieTableMetaClient.reload(metaClient); assertEquals(0, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count()); } + + @Test + public void testRepairDeprecatedPartition() throws IOException { + tablePath = tablePath + "/repair_test/"; + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.COPY_ON_WRITE.name()) + .setTableName(tableName()) + .setArchiveLogFolder(HoodieTableConfig.ARCHIVELOG_FOLDER.defaultValue()) + .setPayloadClassName("org.apache.hudi.common.model.HoodieAvroPayload") + .setTimelineLayoutVersion(TimelineLayoutVersion.VERSION_1) + .setPartitionFields("partition_path") + .setRecordKeyFields("_row_key") + .setKeyGeneratorClassProp(SimpleKeyGenerator.class.getCanonicalName()) + .initTable(HoodieCLI.conf, tablePath); + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath).withSchema(TRIP_EXAMPLE_SCHEMA).build(); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(context(), config)) { + String newCommitTime = "001"; + int numRecords = 10; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, numRecords); + JavaRDD writeRecords = context().getJavaSparkContext().parallelize(records, 1); + List result = client.upsert(writeRecords, newCommitTime).collect(); + Assertions.assertNoWriteErrors(result); + + newCommitTime = "002"; + // Generate HoodieRecords w/ null values for partition path field. + List records1 = dataGen.generateInserts(newCommitTime, numRecords); + List records2 = new ArrayList<>(); + records1.forEach(entry -> { + HoodieKey hoodieKey = new HoodieKey(entry.getRecordKey(), PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH); + RawTripTestPayload testPayload = (RawTripTestPayload) entry.getData(); + try { + GenericRecord genericRecord = (GenericRecord) testPayload.getRecordToInsert(HoodieTestDataGenerator.AVRO_SCHEMA); + genericRecord.put("partition_path", null); + records2.add(new HoodieAvroRecord(hoodieKey, new RawTripTestPayload(genericRecord.toString(), hoodieKey.getRecordKey(), hoodieKey.getPartitionPath(), TRIP_EXAMPLE_SCHEMA))); + } catch (IOException e) { + e.printStackTrace(); + } + }); + + client.startCommitWithTime(newCommitTime); + // ingest records2 which has null for partition path fields, but goes into "default" partition. + JavaRDD writeRecords2 = context().getJavaSparkContext().parallelize(records2, 1); + List result2 = client.bulkInsert(writeRecords2, newCommitTime).collect(); + Assertions.assertNoWriteErrors(result2); + + SQLContext sqlContext = context().getSqlContext(); + long totalRecs = sqlContext.read().format("hudi").load(tablePath).count(); + assertEquals(totalRecs, 20); + + // Execute repair deprecated partition command + assertEquals(0, SparkMain.repairDeprecatedPartition(jsc(), tablePath)); + + // there should not be any records w/ default partition + totalRecs = sqlContext.read().format("hudi").load(tablePath) + .filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH + "'").count(); + assertEquals(totalRecs, 0); + + // all records from default partition should have been migrated to __HIVE_DEFAULT_PARTITION__ + totalRecs = sqlContext.read().format("hudi").load(tablePath) + .filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH + "'").count(); + assertEquals(totalRecs, 10); + } + } + } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 49bb138e54cd..81a1f32ca2bf 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -96,9 +96,10 @@ public class HoodieTestDataGenerator implements AutoCloseable { public static final String[] DEFAULT_PARTITION_PATHS = {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}; public static final int DEFAULT_PARTITION_DEPTH = 3; + public static final String TRIP_SCHEMA_PREFIX = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " + "{\"name\": \"timestamp\",\"type\": \"long\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"}," - + "{\"name\": \"partition_path\", \"type\": \"string\"}," + + "{\"name\": \"partition_path\", \"type\": [\"null\", \"string\"], \"default\": null }," + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"}," + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"}," + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},";