Skip to content

Commit

Permalink
[SPARK-33366][SQL] Migrate LOAD DATA command to use UnresolvedTable t…
Browse files Browse the repository at this point in the history
…o resolve the identifier

### What changes were proposed in this pull request?

This PR proposes to migrate `LOAD DATA` to use `UnresolvedTable` to resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing).

Note that `LOAD DATA` is not supported for v2 tables.

### Why are the changes needed?

The changes allow consistent resolution behavior when resolving the table identifier. For example, the following is the current behavior:
```scala
sql("CREATE TEMPORARY VIEW t AS SELECT 1")
sql("CREATE DATABASE db")
sql("CREATE TABLE t (key INT, value STRING) USING hive")
sql("USE db")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE t") // Succeeds
```
With this change, `LOAD DATA` above fails with the following:
```
org.apache.spark.sql.AnalysisException: t is a temp view not table.; line 1 pos 0
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTempViews$$anonfun$apply$7.$anonfun$applyOrElse$39(Analyzer.scala:865)
    at scala.Option.foreach(Option.scala:407)
```
, which is expected since temporary view is resolved first and `LOAD DATA` doesn't support a temporary view.

### Does this PR introduce _any_ user-facing change?

After this PR, `LOAD DATA ... t` is resolved to a temp view `t` instead of table `db.t` in the above scenario.

### How was this patch tested?

Updated existing tests.

Closes #30270 from imback82/load_data_cmd.

Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
imback82 authored and cloud-fan committed Nov 10, 2020
1 parent a1f84d8 commit 90f6f39
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3282,7 +3282,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}

/**
* Create a [[LoadDataStatement]].
* Create a [[LoadData]].
*
* For example:
* {{{
Expand All @@ -3291,8 +3291,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* }}}
*/
override def visitLoadData(ctx: LoadDataContext): LogicalPlan = withOrigin(ctx) {
LoadDataStatement(
tableName = visitMultipartIdentifier(ctx.multipartIdentifier),
LoadData(
child = UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier)),
path = string(ctx.path),
isLocal = ctx.LOCAL != null,
isOverwrite = ctx.OVERWRITE != null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,16 +347,6 @@ case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends
*/
case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement

/**
* A LOAD DATA INTO TABLE statement, as parsed from SQL
*/
case class LoadDataStatement(
tableName: Seq[String],
path: String,
isLocal: Boolean,
isOverwrite: Boolean,
partition: Option[TablePartitionSpec]) extends ParsedStatement

/**
* A SHOW CREATE TABLE statement, as parsed from SQL.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ case class ReplaceTableAsSelect(
}

/**
* The logical plan of the CREATE NAMESPACE command that works for v2 catalogs.
* The logical plan of the CREATE NAMESPACE command.
*/
case class CreateNamespace(
catalog: SupportsNamespaces,
Expand All @@ -270,7 +270,7 @@ case class CreateNamespace(
properties: Map[String, String]) extends Command

/**
* The logical plan of the DROP NAMESPACE command that works for v2 catalogs.
* The logical plan of the DROP NAMESPACE command.
*/
case class DropNamespace(
namespace: LogicalPlan,
Expand All @@ -280,7 +280,7 @@ case class DropNamespace(
}

/**
* The logical plan of the DESCRIBE NAMESPACE command that works for v2 catalogs.
* The logical plan of the DESCRIBE NAMESPACE command.
*/
case class DescribeNamespace(
namespace: LogicalPlan,
Expand All @@ -296,7 +296,7 @@ case class DescribeNamespace(

/**
* The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET (DBPROPERTIES|PROPERTIES)
* command that works for v2 catalogs.
* command.
*/
case class AlterNamespaceSetProperties(
namespace: LogicalPlan,
Expand All @@ -305,8 +305,7 @@ case class AlterNamespaceSetProperties(
}

/**
* The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET LOCATION
* command that works for v2 catalogs.
* The logical plan of the ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET LOCATION command.
*/
case class AlterNamespaceSetLocation(
namespace: LogicalPlan,
Expand All @@ -315,7 +314,7 @@ case class AlterNamespaceSetLocation(
}

/**
* The logical plan of the SHOW NAMESPACES command that works for v2 catalogs.
* The logical plan of the SHOW NAMESPACES command.
*/
case class ShowNamespaces(
namespace: LogicalPlan,
Expand All @@ -327,7 +326,7 @@ case class ShowNamespaces(
}

/**
* The logical plan of the DESCRIBE relation_name command that works for v2 tables.
* The logical plan of the DESCRIBE relation_name command.
*/
case class DescribeRelation(
relation: LogicalPlan,
Expand All @@ -338,7 +337,7 @@ case class DescribeRelation(
}

/**
* The logical plan of the DESCRIBE relation_name col_name command that works for v2 tables.
* The logical plan of the DESCRIBE relation_name col_name command.
*/
case class DescribeColumn(
relation: LogicalPlan,
Expand All @@ -349,7 +348,7 @@ case class DescribeColumn(
}

/**
* The logical plan of the DELETE FROM command that works for v2 tables.
* The logical plan of the DELETE FROM command.
*/
case class DeleteFromTable(
table: LogicalPlan,
Expand All @@ -358,7 +357,7 @@ case class DeleteFromTable(
}

/**
* The logical plan of the UPDATE TABLE command that works for v2 tables.
* The logical plan of the UPDATE TABLE command.
*/
case class UpdateTable(
table: LogicalPlan,
Expand All @@ -368,7 +367,7 @@ case class UpdateTable(
}

/**
* The logical plan of the MERGE INTO command that works for v2 tables.
* The logical plan of the MERGE INTO command.
*/
case class MergeIntoTable(
targetTable: LogicalPlan,
Expand Down Expand Up @@ -407,7 +406,7 @@ case class Assignment(key: Expression, value: Expression) extends Expression wit
}

/**
* The logical plan of the DROP TABLE command that works for v2 tables.
* The logical plan of the DROP TABLE command.
*/
case class DropTable(
child: LogicalPlan,
Expand All @@ -422,7 +421,7 @@ case class DropTable(
case class NoopDropTable(multipartIdentifier: Seq[String]) extends Command

/**
* The logical plan of the ALTER TABLE command that works for v2 tables.
* The logical plan of the ALTER TABLE command.
*/
case class AlterTable(
catalog: TableCatalog,
Expand Down Expand Up @@ -454,15 +453,15 @@ case class AlterTable(
}

/**
* The logical plan of the ALTER TABLE RENAME command that works for v2 tables.
* The logical plan of the ALTER TABLE RENAME command.
*/
case class RenameTable(
catalog: TableCatalog,
oldIdent: Identifier,
newIdent: Identifier) extends Command

/**
* The logical plan of the SHOW TABLE command that works for v2 catalogs.
* The logical plan of the SHOW TABLE command.
*/
case class ShowTables(
namespace: LogicalPlan,
Expand All @@ -475,7 +474,7 @@ case class ShowTables(
}

/**
* The logical plan of the SHOW VIEWS command that works for v1 and v2 catalogs.
* The logical plan of the SHOW VIEWS command.
*
* Notes: v2 catalogs do not support views API yet, the command will fallback to
* v1 ShowViewsCommand during ResolveSessionCatalog.
Expand All @@ -491,22 +490,22 @@ case class ShowViews(
}

/**
* The logical plan of the USE/USE NAMESPACE command that works for v2 catalogs.
* The logical plan of the USE/USE NAMESPACE command.
*/
case class SetCatalogAndNamespace(
catalogManager: CatalogManager,
catalogName: Option[String],
namespace: Option[Seq[String]]) extends Command

/**
* The logical plan of the REFRESH TABLE command that works for v2 catalogs.
* The logical plan of the REFRESH TABLE command.
*/
case class RefreshTable(child: LogicalPlan) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the SHOW CURRENT NAMESPACE command that works for v2 catalogs.
* The logical plan of the SHOW CURRENT NAMESPACE command.
*/
case class ShowCurrentNamespace(catalogManager: CatalogManager) extends Command {
override val output: Seq[Attribute] = Seq(
Expand All @@ -515,7 +514,7 @@ case class ShowCurrentNamespace(catalogManager: CatalogManager) extends Command
}

/**
* The logical plan of the SHOW TBLPROPERTIES command that works for v2 catalogs.
* The logical plan of the SHOW TBLPROPERTIES command.
*/
case class ShowTableProperties(
table: LogicalPlan,
Expand Down Expand Up @@ -556,21 +555,21 @@ case class CommentOnTable(child: LogicalPlan, comment: String) extends Command {
}

/**
* The logical plan of the REFRESH FUNCTION command that works for v2 catalogs.
* The logical plan of the REFRESH FUNCTION command.
*/
case class RefreshFunction(child: LogicalPlan) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the DESCRIBE FUNCTION command that works for v2 catalogs.
* The logical plan of the DESCRIBE FUNCTION command.
*/
case class DescribeFunction(child: LogicalPlan, isExtended: Boolean) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the DROP FUNCTION command that works for v2 catalogs.
* The logical plan of the DROP FUNCTION command.
*/
case class DropFunction(
child: LogicalPlan,
Expand All @@ -580,7 +579,7 @@ case class DropFunction(
}

/**
* The logical plan of the SHOW FUNCTIONS command that works for v2 catalogs.
* The logical plan of the SHOW FUNCTIONS command.
*/
case class ShowFunctions(
child: Option[LogicalPlan],
Expand All @@ -591,7 +590,7 @@ case class ShowFunctions(
}

/**
* The logical plan of the ANALYZE TABLE command that works for v2 catalogs.
* The logical plan of the ANALYZE TABLE command.
*/
case class AnalyzeTable(
child: LogicalPlan,
Expand All @@ -601,7 +600,7 @@ case class AnalyzeTable(
}

/**
* The logical plan of the ANALYZE TABLE FOR COLUMNS command that works for v2 catalogs.
* The logical plan of the ANALYZE TABLE FOR COLUMNS command.
*/
case class AnalyzeColumn(
child: LogicalPlan,
Expand All @@ -611,3 +610,15 @@ case class AnalyzeColumn(
"mutually exclusive. Only one of them should be specified.")
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the LOAD DATA INTO TABLE command.
*/
case class LoadData(
child: LogicalPlan,
path: String,
isLocal: Boolean,
isOverwrite: Boolean,
partition: Option[TablePartitionSpec]) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -1605,24 +1605,24 @@ class DDLParserSuite extends AnalysisTest {
test("LOAD DATA INTO table") {
comparePlans(
parsePlan("LOAD DATA INPATH 'filepath' INTO TABLE a.b.c"),
LoadDataStatement(Seq("a", "b", "c"), "filepath", false, false, None))
LoadData(UnresolvedTable(Seq("a", "b", "c")), "filepath", false, false, None))

comparePlans(
parsePlan("LOAD DATA LOCAL INPATH 'filepath' INTO TABLE a.b.c"),
LoadDataStatement(Seq("a", "b", "c"), "filepath", true, false, None))
LoadData(UnresolvedTable(Seq("a", "b", "c")), "filepath", true, false, None))

comparePlans(
parsePlan("LOAD DATA LOCAL INPATH 'filepath' OVERWRITE INTO TABLE a.b.c"),
LoadDataStatement(Seq("a", "b", "c"), "filepath", true, true, None))
LoadData(UnresolvedTable(Seq("a", "b", "c")), "filepath", true, true, None))

comparePlans(
parsePlan(
s"""
|LOAD DATA LOCAL INPATH 'filepath' OVERWRITE INTO TABLE a.b.c
|PARTITION(ds='2017-06-10')
""".stripMargin),
LoadDataStatement(
Seq("a", "b", "c"),
LoadData(
UnresolvedTable(Seq("a", "b", "c")),
"filepath",
true,
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ class ResolveSessionCatalog(
ignoreIfExists = c.ifNotExists)
}

case RefreshTable(r @ ResolvedTable(_, _, _: V1Table)) if isSessionCatalog(r.catalog) =>
RefreshTableCommand(r.identifier.asTableIdentifier)
case RefreshTable(ResolvedV1TableIdentifier(ident)) =>
RefreshTableCommand(ident.asTableIdentifier)

case RefreshTable(r: ResolvedView) =>
RefreshTableCommand(r.identifier.asTableIdentifier)
Expand Down Expand Up @@ -358,9 +358,8 @@ class ResolveSessionCatalog(
orCreate = c.orCreate)
}

case DropTable(
r @ ResolvedTable(_, _, _: V1Table), ifExists, purge) if isSessionCatalog(r.catalog) =>
DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = false, purge = purge)
case DropTable(ResolvedV1TableIdentifier(ident), ifExists, purge) =>
DropTableCommand(ident.asTableIdentifier, ifExists, isView = false, purge = purge)

// v1 DROP TABLE supports temp view.
case DropTable(r: ResolvedView, ifExists, purge) =>
Expand Down Expand Up @@ -427,10 +426,9 @@ class ResolveSessionCatalog(
v1TableName.asTableIdentifier,
"MSCK REPAIR TABLE")

case LoadDataStatement(tbl, path, isLocal, isOverwrite, partition) =>
val v1TableName = parseV1Table(tbl, "LOAD DATA")
case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) =>
LoadDataCommand(
v1TableName.asTableIdentifier,
ident.asTableIdentifier,
path,
isLocal,
isOverwrite,
Expand Down Expand Up @@ -573,9 +571,8 @@ class ResolveSessionCatalog(
"SHOW VIEWS, only SessionCatalog supports this command.")
}

case ShowTableProperties(
r @ ResolvedTable(_, _, _: V1Table), propertyKey) if isSessionCatalog(r.catalog) =>
ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey)
case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey) =>
ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey)

case ShowTableProperties(r: ResolvedView, propertyKey) =>
ShowTablePropertiesCommand(r.identifier.asTableIdentifier, propertyKey)
Expand Down Expand Up @@ -696,9 +693,16 @@ class ResolveSessionCatalog(
}
}

object ResolvedV1TableOrViewIdentifier {
object ResolvedV1TableIdentifier {
def unapply(resolved: LogicalPlan): Option[Identifier] = resolved match {
case ResolvedTable(catalog, ident, _: V1Table) if isSessionCatalog(catalog) => Some(ident)
case _ => None
}
}

object ResolvedV1TableOrViewIdentifier {
def unapply(resolved: LogicalPlan): Option[Identifier] = resolved match {
case ResolvedV1TableIdentifier(ident) => Some(ident)
case ResolvedView(ident, _) => Some(ident)
case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case AnalyzeTable(_: ResolvedTable, _, _) | AnalyzeColumn(_: ResolvedTable, _, _) =>
throw new AnalysisException("ANALYZE TABLE is not supported for v2 tables.")

case LoadData(_: ResolvedTable, _, _, _, _) =>
throw new AnalysisException("LOAD DATA is not supported for v2 tables.")

case _ => Nil
}
}
Loading

0 comments on commit 90f6f39

Please sign in to comment.