Skip to content

Commit

Permalink
[SPARK-44327][SQL][CONNECT] Add functions any and len to Scala
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Add functions `any` and `len` to Scala API and Spark Connect Scala Client

### Why are the changes needed?
they were ignored in previous PRs since they are keyword or built-in function in Python Side, but we can still add them to Scala.

We already have some scala-specific functions, like `not`

### Does this PR introduce _any_ user-facing change?
yes

### How was this patch tested?
added UT

Closes apache#41886 from zhengruifeng/scala_dedicated_functions.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
  • Loading branch information
zhengruifeng committed Jul 7, 2023
1 parent d4277b8 commit 7e9b027
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,14 @@ object functions {
*/
def some(e: Column): Column = Column.fn("some", e)

/**
* Aggregate function: returns true if at least one value of `e` is true.
*
* @group agg_funcs
* @since 3.5.0
*/
def any(e: Column): Column = Column.fn("any", e)

/**
* Aggregate function: returns true if at least one value of `e` is true.
*
Expand Down Expand Up @@ -3686,6 +3694,16 @@ object functions {
*/
def length(e: Column): Column = Column.fn("length", e)

/**
* Computes the character length of a given string or number of bytes of a binary string. The
* length of character strings include the trailing spaces. The length of binary strings
* includes binary zeros.
*
* @group string_funcs
* @since 3.5.0
*/
def len(e: Column): Column = Column.fn("len", e)

/**
* Converts a string column to lower case.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,10 @@ class PlanGenerationTestSuite
boolean.select(fn.some(fn.col("flag")))
}

test("function any") {
boolean.select(fn.any(fn.col("flag")))
}

test("function bool_or") {
boolean.select(fn.bool_or(fn.col("flag")))
}
Expand Down Expand Up @@ -1629,6 +1633,10 @@ class PlanGenerationTestSuite
fn.length(fn.col("g"))
}

functionTest("len") {
fn.len(fn.col("g"))
}

functionTest("lower") {
fn.lower(fn.col("g"))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Aggregate [max(flag#0) AS any(flag)#0]
+- LocalRelation <empty>, [id#0L, flag#0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Project [len(g#0) AS len(g)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"common": {
"planId": "1"
},
"project": {
"input": {
"common": {
"planId": "0"
},
"localRelation": {
"schema": "struct\u003cid:bigint,flag:boolean\u003e"
}
},
"expressions": [{
"unresolvedFunction": {
"functionName": "any",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "flag"
}
}]
}
}]
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"common": {
"planId": "1"
},
"project": {
"input": {
"common": {
"planId": "0"
},
"localRelation": {
"schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
}
},
"expressions": [{
"unresolvedFunction": {
"functionName": "len",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "g"
}
}]
}
}]
}
}
Binary file not shown.
2 changes: 2 additions & 0 deletions python/pyspark/sql/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ def test_function_parity(self):
"typedLit", # Scala only
"monotonicallyIncreasingId", # depreciated, use monotonically_increasing_id
"not", # equivalent to python ~expression
"any", # equivalent to python ~some
"len", # equivalent to python ~length
"udaf", # used for creating UDAF's which are not supported in PySpark
]

Expand Down
18 changes: 18 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,14 @@ object functions {
*/
def some(e: Column): Column = withAggregateFunction { BoolOr(e.expr) }

/**
* Aggregate function: returns true if at least one value of `e` is true.
*
* @group agg_funcs
* @since 3.5.0
*/
def any(e: Column): Column = withAggregateFunction { BoolOr(e.expr) }

/**
* Aggregate function: returns true if at least one value of `e` is true.
*
Expand Down Expand Up @@ -3769,6 +3777,16 @@ object functions {
*/
def length(e: Column): Column = withExpr { Length(e.expr) }

/**
* Computes the character length of a given string or number of bytes of a binary string.
* The length of character strings include the trailing spaces. The length of binary strings
* includes binary zeros.
*
* @group string_funcs
* @since 3.5.0
*/
def len(e: Column): Column = withExpr { Length(e.expr) }

/**
* Converts a string column to lower case.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,11 @@ class DataFrameAggregateSuite extends QueryTest
checkAnswer(testData2.agg(regr_syy($"a", $"b")), testData2.selectExpr("regr_syy(a, b)"))
}

test("every | bool_and | some | bool_or") {
test("every | bool_and | some | any | bool_or") {
checkAnswer(complexData.agg(every($"b")), complexData.selectExpr("every(b)"))
checkAnswer(complexData.agg(bool_and($"b")), complexData.selectExpr("bool_and(b)"))
checkAnswer(complexData.agg(some($"b")), complexData.selectExpr("some(b)"))
checkAnswer(complexData.agg(any($"b")), complexData.selectExpr("any(b)"))
checkAnswer(complexData.agg(bool_or($"b")), complexData.selectExpr("bool_or(b)"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,10 @@ class StringFunctionsSuite extends QueryTest with SharedSparkSession {
df.select(length($"a"), length($"b")),
Row(3, 4))

checkAnswer(
df.select(len($"a"), len($"b")),
Row(3, 4))

Seq("length", "len").foreach { len =>
checkAnswer(df.selectExpr(s"$len(a)", s"$len(b)"), Row(3, 4))
checkAnswer(df.selectExpr(s"$len(c)", s"$len(d)", s"$len(e)"), Row(3, 3, 5))
Expand Down

0 comments on commit 7e9b027

Please sign in to comment.