diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java index cf7b67b449fe..8aa032666e9c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java @@ -49,8 +49,10 @@ public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Datas // tmp table name doesn't like dashes String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_")); LOG.info("Registering tmp table : " + tmpTable); - rowDataset.registerTempTable(tmpTable); - return sparkSession.sql("select " + flattenSchema(rowDataset.schema(), null) + " from " + tmpTable); + rowDataset.createOrReplaceTempView(tmpTable); + Dataset transformed = sparkSession.sql("select " + flattenSchema(rowDataset.schema(), null) + " from " + tmpTable); + sparkSession.catalog().dropTempView(tmpTable); + return transformed; } public String flattenSchema(StructType schema, String prefix) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java index 04264bf4cb3d..a53b50431c8d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlFileBasedTransformer.java @@ -76,7 +76,7 @@ public Dataset apply( // tmp table name doesn't like dashes final String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_")); LOG.info("Registering tmp table : " + tmpTable); - rowDataset.registerTempTable(tmpTable); + rowDataset.createOrReplaceTempView(tmpTable); try (final Scanner scanner = new Scanner(fs.open(new Path(sqlFile)), "UTF-8")) { Dataset rows = null; @@ -95,6 +95,8 @@ public Dataset apply( return rows; } catch (final IOException ioe) { throw new HoodieIOException("Error reading transformer SQL file.", ioe); + } finally { + sparkSession.catalog().dropTempView(tmpTable); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java index 7e5ed05f26b9..e39ca7463148 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java @@ -60,9 +60,11 @@ public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Datas // tmp table name doesn't like dashes String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_")); LOG.info("Registering tmp table : " + tmpTable); - rowDataset.registerTempTable(tmpTable); + rowDataset.createOrReplaceTempView(tmpTable); String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable); LOG.debug("SQL Query for transformation : (" + sqlStr + ")"); - return sparkSession.sql(sqlStr); + Dataset transformed = sparkSession.sql(sqlStr); + sparkSession.catalog().dropTempView(tmpTable); + return transformed; } }