Skip to content

Commit

Permalink
[HUDI-3670] free temp views in sql transformers (#5080)
Browse files Browse the repository at this point in the history
  • Loading branch information
qjqqyy authored Jun 1, 2022
1 parent dfcd6d9 commit 7276d0e
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Datas
// tmp table name doesn't like dashes
String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
LOG.info("Registering tmp table : " + tmpTable);
rowDataset.registerTempTable(tmpTable);
return sparkSession.sql("select " + flattenSchema(rowDataset.schema(), null) + " from " + tmpTable);
rowDataset.createOrReplaceTempView(tmpTable);
Dataset<Row> transformed = sparkSession.sql("select " + flattenSchema(rowDataset.schema(), null) + " from " + tmpTable);
sparkSession.catalog().dropTempView(tmpTable);
return transformed;
}

public String flattenSchema(StructType schema, String prefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public Dataset<Row> apply(
// tmp table name doesn't like dashes
final String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
LOG.info("Registering tmp table : " + tmpTable);
rowDataset.registerTempTable(tmpTable);
rowDataset.createOrReplaceTempView(tmpTable);

try (final Scanner scanner = new Scanner(fs.open(new Path(sqlFile)), "UTF-8")) {
Dataset<Row> rows = null;
Expand All @@ -95,6 +95,8 @@ public Dataset<Row> apply(
return rows;
} catch (final IOException ioe) {
throw new HoodieIOException("Error reading transformer SQL file.", ioe);
} finally {
sparkSession.catalog().dropTempView(tmpTable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Datas
// tmp table name doesn't like dashes
String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
LOG.info("Registering tmp table : " + tmpTable);
rowDataset.registerTempTable(tmpTable);
rowDataset.createOrReplaceTempView(tmpTable);
String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable);
LOG.debug("SQL Query for transformation : (" + sqlStr + ")");
return sparkSession.sql(sqlStr);
Dataset<Row> transformed = sparkSession.sql(sqlStr);
sparkSession.catalog().dropTempView(tmpTable);
return transformed;
}
}

0 comments on commit 7276d0e

Please sign in to comment.