Skip to content

Commit

Permalink
[SPARK-49564][SQL] Add SQL pipe syntax for set operations
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR adds SQL pipe syntax support for the set operations: UNION, INTERSECT, EXCEPT, DISTINCT.

For example:

```
CREATE TABLE t(x INT, y STRING) USING CSV;
INSERT INTO t VALUES (0, 'abc'), (1, 'def');

TABLE t
|> UNION ALL (SELECT * FROM t);

0	abc
0	abc
1	def
1	def
1	NULL
```

### Why are the changes needed?

The SQL pipe operator syntax will let users compose queries in a more flexible fashion.

### Does this PR introduce _any_ user-facing change?

Yes, see above.

### How was this patch tested?

This PR adds a few unit test cases, but mostly relies on golden file test coverage. I did this to make sure the answers are correct as this feature is implemented and also so we can look at the analyzer output plans to ensure they look right as well.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#48359 from dtenedor/pipe-union.

Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
dtenedor authored and cloud-fan committed Oct 9, 2024
1 parent 5e27eec commit 135cbc6
Show file tree
Hide file tree
Showing 6 changed files with 481 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,7 @@ operatorPipeRightSide
| unpivotClause pivotClause?
| sample
| joinRelation
| operator=(UNION | EXCEPT | SETMINUS | INTERSECT) setQuantifier? right=queryTerm
;

// When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1407,10 +1407,13 @@ class AstBuilder extends DataTypeAstBuilder
* - INTERSECT [DISTINCT | ALL]
*/
override def visitSetOperation(ctx: SetOperationContext): LogicalPlan = withOrigin(ctx) {
val left = plan(ctx.left)
val right = plan(ctx.right)
val all = Option(ctx.setQuantifier()).exists(_.ALL != null)
ctx.operator.getType match {
visitSetOperationImpl(plan(ctx.left), plan(ctx.right), all, ctx.operator.getType)
}

private def visitSetOperationImpl(
left: LogicalPlan, right: LogicalPlan, all: Boolean, operatorType: Int): LogicalPlan = {
operatorType match {
case SqlBaseParser.UNION if all =>
Union(left, right)
case SqlBaseParser.UNION =>
Expand Down Expand Up @@ -5918,7 +5921,10 @@ class AstBuilder extends DataTypeAstBuilder
withSample(c, left)
}.getOrElse(Option(ctx.joinRelation()).map { c =>
withJoinRelation(c, left)
}.get)))))
}.getOrElse(Option(ctx.operator).map { c =>
val all = Option(ctx.setQuantifier()).exists(_.ALL != null)
visitSetOperationImpl(left, plan(ctx.right), all, c.getType)
}.get))))))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1820,6 +1820,208 @@ org.apache.spark.sql.catalyst.parser.ParseException
}


-- !query
table t
|> union all table t
-- !query analysis
Union false, false
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> union table t
-- !query analysis
Distinct
+- Union false, false
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
(select * from t)
|> union all table t
-- !query analysis
Union false, false
:- Project [x#x, y#x]
: +- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
(select * from t)
|> union table t
-- !query analysis
Distinct
+- Union false, false
:- Project [x#x, y#x]
: +- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
values (0, 'abc') tab(x, y)
|> union all table t
-- !query analysis
Union false, false
:- SubqueryAlias tab
: +- LocalRelation [x#x, y#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
values (0, 1) tab(x, y)
|> union table t
-- !query analysis
Distinct
+- Union false, false
:- Project [x#x, cast(y#x as string) AS y#x]
: +- SubqueryAlias tab
: +- LocalRelation [x#x, y#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
(select * from t)
|> union all (select * from t)
-- !query analysis
Union false, false
:- Project [x#x, y#x]
: +- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- Project [x#x, y#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> except all table t
-- !query analysis
Except All true
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> except table t
-- !query analysis
Except false
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> intersect all table t
-- !query analysis
Intersect All true
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> intersect table t
-- !query analysis
Intersect false
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> minus all table t
-- !query analysis
Except All true
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> minus table t
-- !query analysis
Except false
:- SubqueryAlias spark_catalog.default.t
: +- Relation spark_catalog.default.t[x#x,y#x] csv
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select x
|> union all table t
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "NUM_COLUMNS_MISMATCH",
"sqlState" : "42826",
"messageParameters" : {
"firstNumColumns" : "1",
"invalidNumColumns" : "2",
"invalidOrdinalNum" : "second",
"operator" : "UNION"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 1,
"stopIndex" : 40,
"fragment" : "table t\n|> select x\n|> union all table t"
} ]
}


-- !query
table t
|> union all table st
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "INCOMPATIBLE_COLUMN_TYPE",
"sqlState" : "42825",
"messageParameters" : {
"columnOrdinalNumber" : "second",
"dataType1" : "\"STRUCT<i1: INT, i2: INT>\"",
"dataType2" : "\"STRING\"",
"hint" : "",
"operator" : "UNION",
"tableOrdinalNumber" : "second"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 1,
"stopIndex" : 29,
"fragment" : "table t\n|> union all table st"
} ]
}


-- !query
drop table t
-- !query analysis
Expand Down
67 changes: 67 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,73 @@ table join_test_t1
table join_test_t1 jt
|> cross join (select * from jt);

-- Set operations: positive tests.
-----------------------------------

-- Union all.
table t
|> union all table t;

-- Union distinct.
table t
|> union table t;

-- Union all with a table subquery.
(select * from t)
|> union all table t;

-- Union distinct with a table subquery.
(select * from t)
|> union table t;

-- Union all with a VALUES list.
values (0, 'abc') tab(x, y)
|> union all table t;

-- Union distinct with a VALUES list.
values (0, 1) tab(x, y)
|> union table t;

-- Union all with a table subquery on both the source and target sides.
(select * from t)
|> union all (select * from t);

-- Except all.
table t
|> except all table t;

-- Except distinct.
table t
|> except table t;

-- Intersect all.
table t
|> intersect all table t;

-- Intersect distinct.
table t
|> intersect table t;

-- Minus all.
table t
|> minus all table t;

-- Minus distinct.
table t
|> minus table t;

-- Set operations: negative tests.
-----------------------------------

-- The UNION operator requires the same number of columns in the input relations.
table t
|> select x
|> union all table t;

-- The UNION operator requires the column types to be compatible.
table t
|> union all table st;

-- Cleanup.
-----------
drop table t;
Expand Down
Loading

0 comments on commit 135cbc6

Please sign in to comment.