Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core #29085

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
dfcec3c
[SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZhuuuu Jul 13, 2020
e53744b
save
AngersZhuuuu Jul 13, 2020
a693722
save
AngersZhuuuu Jul 13, 2020
5bfa669
follow comment
AngersZhuuuu Jul 14, 2020
ec754e2
fix input and out put format
AngersZhuuuu Jul 14, 2020
a2b12a1
follow comment
AngersZhuuuu Jul 15, 2020
c3dc66b
follow comment
AngersZhuuuu Jul 15, 2020
cb19b7b
follow comment
AngersZhuuuu Jul 17, 2020
ce8a0a5
fix bytetype and add it in UT
AngersZhuuuu Jul 17, 2020
d37ef86
format code
AngersZhuuuu Jul 17, 2020
fce25ff
Fix
maropu Jul 17, 2020
f3e05c6
Fix
maropu Jul 17, 2020
5c049b5
Merge pull request #5 from maropu/pr29085
AngersZhuuuu Jul 18, 2020
04684a8
fix UT and follow comment
AngersZhuuuu Jul 18, 2020
6811721
move ut and add ut for schema less
AngersZhuuuu Jul 18, 2020
fc96e1f
follow comment
AngersZhuuuu Jul 18, 2020
ed901af
Update BaseScriptTransformationExec.scala
AngersZhuuuu Jul 18, 2020
a6f1e7d
catch data convert exception
AngersZhuuuu Jul 18, 2020
e367c05
add UTD support
AngersZhuuuu Jul 18, 2020
e74d04c
add test
AngersZhuuuu Jul 18, 2020
4ef4d76
add data type
AngersZhuuuu Jul 19, 2020
22d223c
fix ut
AngersZhuuuu Jul 19, 2020
72b2155
added UT
AngersZhuuuu Jul 20, 2020
a3628ac
update
AngersZhuuuu Jul 20, 2020
e16c136
update title
AngersZhuuuu Jul 20, 2020
858f4e5
Update BaseScriptTransformationExec.scala
AngersZhuuuu Jul 20, 2020
cfecc90
support array map struct
AngersZhuuuu Jul 21, 2020
43d0f24
Revert "support array map struct"
AngersZhuuuu Jul 21, 2020
9e18fa8
fix SQLQueryTestSuite
AngersZhuuuu Jul 22, 2020
9537d9b
address comment
AngersZhuuuu Jul 22, 2020
5227441
Update BaseScriptTransformationExec.scala
AngersZhuuuu Jul 22, 2020
670f21b
Update BaseScriptTransformationSuite.scala
AngersZhuuuu Jul 22, 2020
ce8184a
address comment
AngersZhuuuu Jul 22, 2020
4615733
Update SparkScriptTransformationSuite.scala
AngersZhuuuu Jul 22, 2020
08d97c8
throw exception when complex data type
AngersZhuuuu Jul 22, 2020
33923b6
https://github.com/apache/spark/pull/29085#discussion_r458676081
AngersZhuuuu Jul 22, 2020
f5ec656
https://github.com/apache/spark/pull/29085#discussion_r458687735
AngersZhuuuu Jul 22, 2020
7916d72
https://github.com/apache/spark/pull/29085#discussion_r458692902
AngersZhuuuu Jul 22, 2020
a769aa7
address comment
AngersZhuuuu Jul 22, 2020
d93f7fa
add UT of row format and fi UT
AngersZhuuuu Jul 22, 2020
be80c27
address comment
AngersZhuuuu Jul 23, 2020
7f3cff8
Update PlanParserSuite.scala
AngersZhuuuu Jul 23, 2020
03d3409
address comment
AngersZhuuuu Jul 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -744,8 +744,29 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
selectClause.hints.asScala.foldRight(withWindow)(withHints)
}

// Decode and input/output format.
type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format -> ScriptIOFormat? Then, could you make the comment above clearer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


protected def getRowFormatDelimited(ctx: RowFormatDelimitedContext): Format = {
// TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
// expects a seq of pairs in which the old parsers' token names are used as keys.
// Transforming the result of visitRowFormatDelimited would be quite a bit messier than
// retrieving the key value pairs ourselves.
def entry(key: String, value: Token): Seq[(String, String)] = {
Option(value).map(t => key -> t.getText).toSeq
}

val entries = entry("TOK_TABLEROWFORMATFIELD", ctx.fieldsTerminatedBy) ++
entry("TOK_TABLEROWFORMATCOLLITEMS", ctx.collectionItemsTerminatedBy) ++
entry("TOK_TABLEROWFORMATMAPKEYS", ctx.keysTerminatedBy) ++
entry("TOK_TABLEROWFORMATLINES", ctx.linesSeparatedBy) ++
entry("TOK_TABLEROWFORMATNULL", ctx.nullDefinedAs)

(entries, None, Seq.empty, None)
}

/**
* Create a (Hive based) [[ScriptInputOutputSchema]].
* Create a [[ScriptInputOutputSchema]].
*/
protected def withScriptIOSchema(
AngersZhuuuu marked this conversation as resolved.
Show resolved Hide resolved
ctx: ParserRuleContext,
Expand All @@ -754,7 +775,30 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
outRowFormat: RowFormatContext,
recordReader: Token,
schemaLess: Boolean): ScriptInputOutputSchema = {
throw new ParseException("Script Transform is not supported", ctx)

def format(fmt: RowFormatContext): Format = fmt match {
case c: RowFormatDelimitedContext =>
getRowFormatDelimited(c)

case c: RowFormatSerdeContext =>
throw new ParseException("TRANSFORM with serde is only supported in hive mode", ctx)

// SPARK-32106: When there is no definition about format, we return empty result
// to use a built-in default Serde in SparkScriptTransformationExec.
case null =>
(Nil, None, Seq.empty, None)
}

val (inFormat, inSerdeClass, inSerdeProps, reader) = format(inRowFormat)

val (outFormat, outSerdeClass, outSerdeProps, writer) = format(outRowFormat)

ScriptInputOutputSchema(
inFormat, outFormat,
inSerdeClass, outSerdeClass,
inSerdeProps, outSerdeProps,
reader, writer,
schemaLess)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}

/**
* Parser test cases for rules defined in [[CatalystSqlParser]] / [[AstBuilder]].
Expand Down Expand Up @@ -1031,4 +1031,96 @@ class PlanParserSuite extends AnalysisTest {
assertEqual("select a, b from db.c;;;", table("db", "c").select('a, 'b))
assertEqual("select a, b from db.c; ;; ;", table("db", "c").select('a, 'b))
}

test("SPARK-32106: TRANSFORM without serde") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TRANSFORM without serde -> TRANSFORM plan?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could you check ROW FORMAT SERDE, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could you check ROW FORMAT SERDE, too?

Add UT

// verify schema less
assertEqual(
"""
|SELECT TRANSFORM(a, b, c)
|USING 'cat'
|FROM testData
""".stripMargin,
ScriptTransformation(
Seq('a, 'b, 'c),
"cat",
Seq(AttributeReference("key", StringType)(),
AttributeReference("value", StringType)()),
UnresolvedRelation(TableIdentifier("testData")),
ScriptInputOutputSchema(List.empty, List.empty, None, None,
List.empty, List.empty, None, None, true))
)

// verify without output schema
assertEqual(
"""
|SELECT TRANSFORM(a, b, c)
|USING 'cat' AS (a, b, c)
|FROM testData
""".stripMargin,
ScriptTransformation(
Seq('a, 'b, 'c),
"cat",
Seq(AttributeReference("a", StringType)(),
AttributeReference("b", StringType)(),
AttributeReference("c", StringType)()),
UnresolvedRelation(TableIdentifier("testData")),
ScriptInputOutputSchema(List.empty, List.empty, None, None,
List.empty, List.empty, None, None, false)))

// verify with output schema
assertEqual(
"""
|SELECT TRANSFORM(a, b, c)
|USING 'cat' AS (a int, b string, c long)
|FROM testData
""".stripMargin,
ScriptTransformation(
Seq('a, 'b, 'c),
"cat",
Seq(AttributeReference("a", IntegerType)(),
AttributeReference("b", StringType)(),
AttributeReference("c", LongType)()),
UnresolvedRelation(TableIdentifier("testData")),
ScriptInputOutputSchema(List.empty, List.empty, None, None,
List.empty, List.empty, None, None, false)))

// verify with ROW FORMAT DELIMETED
assertEqual(
"""
|SELECT TRANSFORM(a, b, c)
|ROW FORMAT DELIMITED
|FIELDS TERMINATED BY '\t'
|COLLECTION ITEMS TERMINATED BY '\u0002'
|MAP KEYS TERMINATED BY '\u0003'
|LINES TERMINATED BY '\n'
|NULL DEFINED AS 'null'
|USING 'cat' AS (a, b, c)
|ROW FORMAT DELIMITED
|FIELDS TERMINATED BY '\t'
|COLLECTION ITEMS TERMINATED BY '\u0004'
|MAP KEYS TERMINATED BY '\u0005'
|LINES TERMINATED BY '\n'
|NULL DEFINED AS 'NULL'
|FROM testData
""".stripMargin,
ScriptTransformation(
Seq('a, 'b, 'c),
"cat",
Seq(AttributeReference("a", StringType)(),
AttributeReference("b", StringType)(),
AttributeReference("c", StringType)()),
UnresolvedRelation(TableIdentifier("testData")),
ScriptInputOutputSchema(
Seq(("TOK_TABLEROWFORMATFIELD", "'\\t'"),
("TOK_TABLEROWFORMATCOLLITEMS", "'\u0002'"),
("TOK_TABLEROWFORMATMAPKEYS", "'\u0003'"),
("TOK_TABLEROWFORMATLINES", "'\\n'"),
("TOK_TABLEROWFORMATNULL", "'null'")),
Seq(("TOK_TABLEROWFORMATFIELD", "'\\t'"),
("TOK_TABLEROWFORMATCOLLITEMS", "'\u0004'"),
("TOK_TABLEROWFORMATMAPKEYS", "'\u0005'"),
("TOK_TABLEROWFORMATLINES", "'\\n'"),
("TOK_TABLEROWFORMATNULL", "'NULL'")), None, None,
List.empty, List.empty, None, None, false)))
}
}
Loading