diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index f0481a1a7073c..9d237f069132a 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -1511,6 +1511,7 @@ operatorPipeRightSide | unpivotClause pivotClause? | sample | joinRelation + | operator=(UNION | EXCEPT | SETMINUS | INTERSECT) setQuantifier? right=queryTerm ; // When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 2b0443c01f6d5..c9150b8a26100 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1407,10 +1407,13 @@ class AstBuilder extends DataTypeAstBuilder * - INTERSECT [DISTINCT | ALL] */ override def visitSetOperation(ctx: SetOperationContext): LogicalPlan = withOrigin(ctx) { - val left = plan(ctx.left) - val right = plan(ctx.right) val all = Option(ctx.setQuantifier()).exists(_.ALL != null) - ctx.operator.getType match { + visitSetOperationImpl(plan(ctx.left), plan(ctx.right), all, ctx.operator.getType) + } + + private def visitSetOperationImpl( + left: LogicalPlan, right: LogicalPlan, all: Boolean, operatorType: Int): LogicalPlan = { + operatorType match { case SqlBaseParser.UNION if all => Union(left, right) case SqlBaseParser.UNION => @@ -5918,7 +5921,10 @@ class AstBuilder extends DataTypeAstBuilder withSample(c, left) }.getOrElse(Option(ctx.joinRelation()).map { c => withJoinRelation(c, left) - }.get))))) + }.getOrElse(Option(ctx.operator).map { c => + val all = Option(ctx.setQuantifier()).exists(_.ALL != null) + visitSetOperationImpl(left, plan(ctx.right), all, c.getType) + }.get)))))) } /** diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out index 4479c93f6e84e..7fa4ec0514ff0 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out @@ -1820,6 +1820,208 @@ org.apache.spark.sql.catalyst.parser.ParseException } +-- !query +table t +|> union all table t +-- !query analysis +Union false, false +:- SubqueryAlias spark_catalog.default.t +: +- Relation spark_catalog.default.t[x#x,y#x] csv ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> union table t +-- !query analysis +Distinct ++- Union false, false + :- SubqueryAlias spark_catalog.default.t + : +- Relation spark_catalog.default.t[x#x,y#x] csv + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +(select * from t) +|> union all table t +-- !query analysis +Union false, false +:- Project [x#x, y#x] +: +- SubqueryAlias spark_catalog.default.t +: +- Relation spark_catalog.default.t[x#x,y#x] csv ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +(select * from t) +|> union table t +-- !query analysis +Distinct ++- Union false, false + :- Project [x#x, y#x] + : +- SubqueryAlias spark_catalog.default.t + : +- Relation spark_catalog.default.t[x#x,y#x] csv + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +values (0, 'abc') tab(x, y) +|> union all table t +-- !query analysis +Union false, false +:- SubqueryAlias tab +: +- LocalRelation [x#x, y#x] ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +values (0, 1) tab(x, y) +|> union table t +-- !query analysis +Distinct ++- Union false, false + :- Project [x#x, cast(y#x as string) AS y#x] + : +- SubqueryAlias tab + : +- LocalRelation [x#x, y#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +(select * from t) +|> union all (select * from t) +-- !query analysis +Union false, false +:- Project [x#x, y#x] +: +- SubqueryAlias spark_catalog.default.t +: +- Relation spark_catalog.default.t[x#x,y#x] csv ++- Project [x#x, y#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> except all table t +-- !query analysis +Except All true +:- SubqueryAlias spark_catalog.default.t +: +- Relation spark_catalog.default.t[x#x,y#x] csv ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> except table t +-- !query analysis +Except false +:- SubqueryAlias spark_catalog.default.t +: +- Relation spark_catalog.default.t[x#x,y#x] csv ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> intersect all table t +-- !query analysis +Intersect All true +:- SubqueryAlias spark_catalog.default.t +: +- Relation spark_catalog.default.t[x#x,y#x] csv ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> intersect table t +-- !query analysis +Intersect false +:- SubqueryAlias spark_catalog.default.t +: +- Relation spark_catalog.default.t[x#x,y#x] csv ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> minus all table t +-- !query analysis +Except All true +:- SubqueryAlias spark_catalog.default.t +: +- Relation spark_catalog.default.t[x#x,y#x] csv ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> minus table t +-- !query analysis +Except false +:- SubqueryAlias spark_catalog.default.t +: +- Relation spark_catalog.default.t[x#x,y#x] csv ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select x +|> union all table t +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "NUM_COLUMNS_MISMATCH", + "sqlState" : "42826", + "messageParameters" : { + "firstNumColumns" : "1", + "invalidNumColumns" : "2", + "invalidOrdinalNum" : "second", + "operator" : "UNION" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 40, + "fragment" : "table t\n|> select x\n|> union all table t" + } ] +} + + +-- !query +table t +|> union all table st +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INCOMPATIBLE_COLUMN_TYPE", + "sqlState" : "42825", + "messageParameters" : { + "columnOrdinalNumber" : "second", + "dataType1" : "\"STRUCT\"", + "dataType2" : "\"STRING\"", + "hint" : "", + "operator" : "UNION", + "tableOrdinalNumber" : "second" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 29, + "fragment" : "table t\n|> union all table st" + } ] +} + + -- !query drop table t -- !query analysis diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql index 1f8450e3507cb..61890f5cb146d 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql @@ -504,6 +504,73 @@ table join_test_t1 table join_test_t1 jt |> cross join (select * from jt); +-- Set operations: positive tests. +----------------------------------- + +-- Union all. +table t +|> union all table t; + +-- Union distinct. +table t +|> union table t; + +-- Union all with a table subquery. +(select * from t) +|> union all table t; + +-- Union distinct with a table subquery. +(select * from t) +|> union table t; + +-- Union all with a VALUES list. +values (0, 'abc') tab(x, y) +|> union all table t; + +-- Union distinct with a VALUES list. +values (0, 1) tab(x, y) +|> union table t; + +-- Union all with a table subquery on both the source and target sides. +(select * from t) +|> union all (select * from t); + +-- Except all. +table t +|> except all table t; + +-- Except distinct. +table t +|> except table t; + +-- Intersect all. +table t +|> intersect all table t; + +-- Intersect distinct. +table t +|> intersect table t; + +-- Minus all. +table t +|> minus all table t; + +-- Minus distinct. +table t +|> minus table t; + +-- Set operations: negative tests. +----------------------------------- + +-- The UNION operator requires the same number of columns in the input relations. +table t +|> select x +|> union all table t; + +-- The UNION operator requires the column types to be compatible. +table t +|> union all table st; + -- Cleanup. ----------- drop table t; diff --git a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out index a54e66e53f0f3..8cbc5357d78b6 100644 --- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out @@ -1484,6 +1484,195 @@ org.apache.spark.sql.catalyst.parser.ParseException } +-- !query +table t +|> union all table t +-- !query schema +struct +-- !query output +0 abc +0 abc +1 def +1 def + + +-- !query +table t +|> union table t +-- !query schema +struct +-- !query output +0 abc +1 def + + +-- !query +(select * from t) +|> union all table t +-- !query schema +struct +-- !query output +0 abc +0 abc +1 def +1 def + + +-- !query +(select * from t) +|> union table t +-- !query schema +struct +-- !query output +0 abc +1 def + + +-- !query +values (0, 'abc') tab(x, y) +|> union all table t +-- !query schema +struct +-- !query output +0 abc +0 abc +1 def + + +-- !query +values (0, 1) tab(x, y) +|> union table t +-- !query schema +struct +-- !query output +0 1 +0 abc +1 def + + +-- !query +(select * from t) +|> union all (select * from t) +-- !query schema +struct +-- !query output +0 abc +0 abc +1 def +1 def + + +-- !query +table t +|> except all table t +-- !query schema +struct +-- !query output + + + +-- !query +table t +|> except table t +-- !query schema +struct +-- !query output + + + +-- !query +table t +|> intersect all table t +-- !query schema +struct +-- !query output +0 abc +1 def + + +-- !query +table t +|> intersect table t +-- !query schema +struct +-- !query output +0 abc +1 def + + +-- !query +table t +|> minus all table t +-- !query schema +struct +-- !query output + + + +-- !query +table t +|> minus table t +-- !query schema +struct +-- !query output + + + +-- !query +table t +|> select x +|> union all table t +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "NUM_COLUMNS_MISMATCH", + "sqlState" : "42826", + "messageParameters" : { + "firstNumColumns" : "1", + "invalidNumColumns" : "2", + "invalidOrdinalNum" : "second", + "operator" : "UNION" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 40, + "fragment" : "table t\n|> select x\n|> union all table t" + } ] +} + + +-- !query +table t +|> union all table st +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INCOMPATIBLE_COLUMN_TYPE", + "sqlState" : "42825", + "messageParameters" : { + "columnOrdinalNumber" : "second", + "dataType1" : "\"STRUCT\"", + "dataType2" : "\"STRING\"", + "hint" : "", + "operator" : "UNION", + "tableOrdinalNumber" : "second" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 29, + "fragment" : "table t\n|> union all table st" + } ] +} + + -- !query drop table t -- !query schema diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index 20b9c9caa7493..fc1c9c6755572 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -943,6 +943,18 @@ class SparkSqlParserSuite extends AnalysisTest with SharedSparkSession { "FULL OUTER", "ANTI", "LEFT ANTI", "CROSS").foreach { joinType => checkPipeJoin(s"TABLE t |> $joinType JOIN other ON (t.x = other.x)") } + // Set operations + def checkDistinct(query: String): Unit = check(query, Seq(DISTINCT_LIKE)) + def checkExcept(query: String): Unit = check(query, Seq(EXCEPT)) + def checkIntersect(query: String): Unit = check(query, Seq(INTERSECT)) + def checkUnion(query: String): Unit = check(query, Seq(UNION)) + checkDistinct("TABLE t |> UNION DISTINCT TABLE t") + checkExcept("TABLE t |> EXCEPT ALL TABLE t") + checkExcept("TABLE t |> EXCEPT DISTINCT TABLE t") + checkExcept("TABLE t |> MINUS ALL TABLE t") + checkExcept("TABLE t |> MINUS DISTINCT TABLE t") + checkIntersect("TABLE t |> INTERSECT ALL TABLE t") + checkUnion("TABLE t |> UNION ALL TABLE t") } } }