Skip to content

Commit

Permalink
[SPARK-31937][SQL] Support processing ArrayType/MapType/StructType da…
Browse files Browse the repository at this point in the history
…ta using no-serde mode script transform

### What changes were proposed in this pull request?
Support no-serde mode script transform use ArrayType/MapType/StructStpe data.

### Why are the changes needed?
Make user can process array/map/struct data

### Does this PR introduce _any_ user-facing change?
Yes, user can process array/map/struct data in script transform `no-serde` mode

### How was this patch tested?
Added UT

Closes #30957 from AngersZhuuuu/SPARK-31937.

Lead-authored-by: Angerszhuuuu <[email protected]>
Co-authored-by: angerszhu <[email protected]>
Co-authored-by: AngersZhuuuu <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
  • Loading branch information
AngersZhuuuu authored and HyukjinKwon committed Apr 19, 2021
1 parent 7a06cdd commit a74f601
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 53 deletions.
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ license: |

- In Spark 3.2, `TRANSFORM` operator can't support alias in inputs. In Spark 3.1 and earlier, we can write script transform like `SELECT TRANSFORM(a AS c1, b AS c2) USING 'cat' FROM TBL`.

- In Spark 3.2, `TRANSFORM` operator can support `ArrayType/MapType/StructType` without Hive SerDe, in this mode, we use `StructsToJosn` to convert `ArrayType/MapType/StructType` column to `STRING` and use `JsonToStructs` to parse `STRING` to `ArrayType/MapType/StructType`. In Spark 3.1, Spark just support case `ArrayType/MapType/StructType` column as `STRING` but can't support parse `STRING` to `ArrayType/MapType/StructType` output columns.

## Upgrading from Spark SQL 3.0 to 3.1

- In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggregate` to `true`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.{SparkException, SparkFiles, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Cast, Expression, GenericInternalRow, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Cast, Expression, GenericInternalRow, JsonToStructs, Literal, StructsToJson, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
Expand All @@ -47,7 +47,14 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
def ioschema: ScriptTransformationIOSchema

protected lazy val inputExpressionsWithoutSerde: Seq[Expression] = {
input.map(Cast(_, StringType).withTimeZone(conf.sessionLocalTimeZone))
input.map { in =>
in.dataType match {
case _: ArrayType | _: MapType | _: StructType =>
new StructsToJson(ioschema.inputSerdeProps.toMap, in)
.withTimeZone(conf.sessionLocalTimeZone)
case _ => Cast(in, StringType).withTimeZone(conf.sessionLocalTimeZone)
}
}
}

override def producedAttributes: AttributeSet = outputSet -- inputSet
Expand Down Expand Up @@ -220,6 +227,11 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
case CalendarIntervalType => wrapperConvertException(
data => IntervalUtils.stringToInterval(UTF8String.fromString(data)),
converter)
case _: ArrayType | _: MapType | _: StructType =>
val complexTypeFactory = JsonToStructs(attr.dataType,
ioschema.outputSerdeProps.toMap, Literal(null), Some(conf.sessionLocalTimeZone))
wrapperConvertException(data =>
complexTypeFactory.nullSafeEval(UTF8String.fromString(data)), any => any)
case udt: UserDefinedType[_] =>
wrapperConvertException(data => udt.deserialize(data), converter)
case dt =>
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/test/resources/sql-tests/inputs/transform.sql
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ WHERE a <= 4
WINDOW w AS (PARTITION BY b ORDER BY a);

SELECT TRANSFORM(b, MAX(a), CAST(SUM(c) AS STRING), myCol, myCol2)
USING 'cat' AS (a, b, c, d, e)
USING 'cat' AS (a STRING, b STRING, c STRING, d ARRAY<INT>, e STRING)
FROM script_trans
LATERAL VIEW explode(array(array(1,2,3))) myTable AS myCol
LATERAL VIEW explode(myTable.myCol) myTable2 AS myCol2
Expand Down
10 changes: 5 additions & 5 deletions sql/core/src/test/resources/sql-tests/results/transform.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -495,19 +495,19 @@ struct<a:string,b:string,c:string>

-- !query
SELECT TRANSFORM(b, MAX(a), CAST(SUM(c) AS STRING), myCol, myCol2)
USING 'cat' AS (a, b, c, d, e)
USING 'cat' AS (a STRING, b STRING, c STRING, d ARRAY<INT>, e STRING)
FROM script_trans
LATERAL VIEW explode(array(array(1,2,3))) myTable AS myCol
LATERAL VIEW explode(myTable.myCol) myTable2 AS myCol2
WHERE a <= 4
GROUP BY b, myCol, myCol2
HAVING max(a) > 1
-- !query schema
struct<a:string,b:string,c:string,d:string,e:string>
struct<a:string,b:string,c:string,d:array<int>,e:string>
-- !query output
5 4 6 [1, 2, 3] 1
5 4 6 [1, 2, 3] 2
5 4 6 [1, 2, 3] 3
5 4 6 [1,2,3] 1
5 4 6 [1,2,3] 2
5 4 6 [1,2,3] 3


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,16 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
script = "cat",
output = Seq(
AttributeReference("a", CalendarIntervalType)(),
AttributeReference("b", StringType)(),
AttributeReference("c", StringType)(),
AttributeReference("d", StringType)(),
AttributeReference("b", ArrayType(IntegerType))(),
AttributeReference("c", MapType(StringType, IntegerType))(),
AttributeReference("d", StructType(
Array(StructField("_1", IntegerType),
StructField("_2", IntegerType))))(),
AttributeReference("e", new SimpleTupleUDT)()),
child = child,
ioschema = defaultIOSchema
),
df.select('a, 'b.cast("string"), 'c.cast("string"), 'd.cast("string"), 'e).collect())
df.select('a, 'b, 'c, 'd, 'e).collect())
}
}

Expand Down Expand Up @@ -471,6 +473,60 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
}
}

test("SPARK-31936: Script transform support ArrayType/MapType/StructType (no serde)") {
assume(TestUtils.testCommandAvailable("python"))
withTempView("v") {
val df = Seq(
(Array(0, 1, 2), Array(Array(0, 1), Array(2)),
Map("a" -> 1), Map("b" -> Array("a", "b"))),
(Array(3, 4, 5), Array(Array(3, 4), Array(5)),
Map("b" -> 2), Map("c" -> Array("c", "d"))),
(Array(6, 7, 8), Array(Array(6, 7), Array(8)),
Map("c" -> 3), Map("d" -> Array("e", "f")))
).toDF("a", "b", "c", "d")
.select('a, 'b, 'c, 'd,
struct('a, 'b).as("e"),
struct('a, 'd).as("f"),
struct(struct('a, 'b), struct('a, 'd)).as("g")
)

checkAnswer(
df,
(child: SparkPlan) => createScriptTransformationExec(
input = Seq(
df.col("a").expr,
df.col("b").expr,
df.col("c").expr,
df.col("d").expr,
df.col("e").expr,
df.col("f").expr,
df.col("g").expr),
script = "cat",
output = Seq(
AttributeReference("a", ArrayType(IntegerType))(),
AttributeReference("b", ArrayType(ArrayType(IntegerType)))(),
AttributeReference("c", MapType(StringType, IntegerType))(),
AttributeReference("d", MapType(StringType, ArrayType(StringType)))(),
AttributeReference("e", StructType(
Array(StructField("a", ArrayType(IntegerType)),
StructField("b", ArrayType(ArrayType(IntegerType))))))(),
AttributeReference("f", StructType(
Array(StructField("a", ArrayType(IntegerType)),
StructField("d", MapType(StringType, ArrayType(StringType))))))(),
AttributeReference("g", StructType(
Array(StructField("col1", StructType(
Array(StructField("a", ArrayType(IntegerType)),
StructField("b", ArrayType(ArrayType(IntegerType)))))),
StructField("col2", StructType(
Array(StructField("a", ArrayType(IntegerType)),
StructField("d", MapType(StringType, ArrayType(StringType)))))))))()),
child = child,
ioschema = defaultIOSchema
),
df.select('a, 'b, 'c, 'd, 'e, 'f, 'g).collect())
}
}

test("SPARK-33934: Add SparkFile's root dir to env property PATH") {
assume(TestUtils.testCommandAvailable("python"))
val scriptFilePath = copyAndGetResourceFile("test_script.py", ".py").getAbsoluteFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.TestUtils
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -59,44 +59,4 @@ class SparkScriptTransformationSuite extends BaseScriptTransformationSuite with
assert(e.contains("TRANSFORM with serde is only supported in hive mode"))
}
}

test("SPARK-32106: TRANSFORM doesn't support ArrayType/MapType/StructType " +
"as output data type (no serde)") {
assume(TestUtils.testCommandAvailable("/bin/bash"))
// check for ArrayType
val e1 = intercept[SparkException] {
sql(
"""
|SELECT TRANSFORM(a)
|USING 'cat' AS (a array<int>)
|FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c)
""".stripMargin).collect()
}.getMessage
assert(e1.contains("SparkScriptTransformation without serde does not support" +
" ArrayType as output data type"))

// check for MapType
val e2 = intercept[SparkException] {
sql(
"""
|SELECT TRANSFORM(b)
|USING 'cat' AS (b map<int, string>)
|FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c)
""".stripMargin).collect()
}.getMessage
assert(e2.contains("SparkScriptTransformation without serde does not support" +
" MapType as output data type"))

// check for StructType
val e3 = intercept[SparkException] {
sql(
"""
|SELECT TRANSFORM(c)
|USING 'cat' AS (c struct<col1:int, col2:string>)
|FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c)
""".stripMargin).collect()
}.getMessage
assert(e3.contains("SparkScriptTransformation without serde does not support" +
" StructType as output data type"))
}
}

0 comments on commit a74f601

Please sign in to comment.