Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-47258
Browse files Browse the repository at this point in the history
  • Loading branch information
wayneguow authored Aug 23, 2024
2 parents da17bc7 + d84f1a3 commit fbf3be9
Show file tree
Hide file tree
Showing 350 changed files with 20,079 additions and 3,515 deletions.
14 changes: 6 additions & 8 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ jobs:
# - docs/README.md
gem install bundler -v 2.4.22
cd docs
bundle install
bundle install --retry=100
- name: Run documentation build
run: |
# We need this link to make sure `python3` points to `python3.9` which contains the prerequisite packages.
Expand Down Expand Up @@ -1112,14 +1112,12 @@ jobs:
with:
distribution: zulu
java-version: ${{ inputs.java }}
- name: start minikube
run: |
# See more in "Installation" https://minikube.sigs.k8s.io/docs/start/
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo install minikube-linux-amd64 /usr/local/bin/minikube
rm minikube-linux-amd64
- name: Start Minikube
uses: medyagh/[email protected]
with:
# Github Action limit cpu:2, memory: 6947MB, limit to 2U6G for better resource statistic
minikube start --cpus 2 --memory 6144
cpus: 2
memory: 6144m
- name: Print K8S pods and nodes info
run: |
kubectl get pods -A
Expand Down
11 changes: 5 additions & 6 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,11 @@ Python Software Foundation License
python/pyspark/loose_version.py


BSD 0-Clause
------------
org.tukaani:xz


BSD 2-Clause
------------
com.github.luben:zstd-jni
Expand Down Expand Up @@ -520,12 +525,6 @@ org.glassfish.hk2:hk2-locator
org.glassfish.hk2:hk2-utils
org.glassfish.hk2:osgi-resource-locator


Public Domain
-------------
org.tukaani:xz


Creative Commons CC0 1.0 Universal Public Domain Dedication
-----------------------------------------------------------
(see LICENSE-CC0.txt)
Expand Down
75 changes: 30 additions & 45 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -3965,19 +3965,11 @@ setMethod("row_number",
#' yields unresolved \code{a.b.c}
#' @return Column object wrapping JVM UnresolvedNamedLambdaVariable
#' @keywords internal
unresolved_named_lambda_var <- function(...) {
jc <- newJObject(
"org.apache.spark.sql.Column",
newJObject(
"org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable",
lapply(list(...), function(x) {
handledCallJStatic(
"org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable",
"freshVarName",
x)
})
)
)
unresolved_named_lambda_var <- function(name) {
jc <- handledCallJStatic(
"org.apache.spark.sql.api.python.PythonSQLUtils",
"unresolvedNamedLambdaVariable",
name)
column(jc)
}

Expand All @@ -3990,7 +3982,6 @@ unresolved_named_lambda_var <- function(...) {
#' @return JVM \code{LambdaFunction} object
#' @keywords internal
create_lambda <- function(fun) {
as_jexpr <- function(x) callJMethod(x@jc, "expr")

# Process function arguments
parameters <- formals(fun)
Expand All @@ -4011,22 +4002,18 @@ create_lambda <- function(fun) {
stopifnot(class(result) == "Column")

# Convert both Columns to Scala expressions
jexpr <- as_jexpr(result)

jargs <- handledCallJStatic(
"org.apache.spark.api.python.PythonUtils",
"toSeq",
handledCallJStatic(
"java.util.Arrays", "asList", lapply(args, as_jexpr)
)
handledCallJStatic("java.util.Arrays", "asList", lapply(args, function(x) { x@jc }))
)

# Create Scala LambdaFunction
newJObject(
"org.apache.spark.sql.catalyst.expressions.LambdaFunction",
jexpr,
jargs,
FALSE
handledCallJStatic(
"org.apache.spark.sql.api.python.PythonSQLUtils",
"lambdaFunction",
result@jc,
jargs
)
}

Expand All @@ -4039,20 +4026,18 @@ create_lambda <- function(fun) {
#' @return a \code{Column} representing name applied to cols with funs
#' @keywords internal
invoke_higher_order_function <- function(name, cols, funs) {
as_jexpr <- function(x) {
as_col <- function(x) {
if (class(x) == "character") {
x <- column(x)
}
callJMethod(x@jc, "expr")
x@jc
}

jexpr <- do.call(newJObject, c(
paste("org.apache.spark.sql.catalyst.expressions", name, sep = "."),
lapply(cols, as_jexpr),
lapply(funs, create_lambda)
))

column(newJObject("org.apache.spark.sql.Column", jexpr))
jcol <- handledCallJStatic(
"org.apache.spark.sql.api.python.PythonSQLUtils",
"fn",
name,
c(lapply(cols, as_col), lapply(funs, create_lambda))) # check varargs invocation
column(jcol)
}

#' @details
Expand All @@ -4068,7 +4053,7 @@ setMethod("array_aggregate",
signature(x = "characterOrColumn", initialValue = "Column", merge = "function"),
function(x, initialValue, merge, finish = NULL) {
invoke_higher_order_function(
"ArrayAggregate",
"aggregate",
cols = list(x, initialValue),
funs = if (is.null(finish)) {
list(merge)
Expand Down Expand Up @@ -4129,7 +4114,7 @@ setMethod("array_exists",
signature(x = "characterOrColumn", f = "function"),
function(x, f) {
invoke_higher_order_function(
"ArrayExists",
"exists",
cols = list(x),
funs = list(f)
)
Expand All @@ -4145,7 +4130,7 @@ setMethod("array_filter",
signature(x = "characterOrColumn", f = "function"),
function(x, f) {
invoke_higher_order_function(
"ArrayFilter",
"filter",
cols = list(x),
funs = list(f)
)
Expand All @@ -4161,7 +4146,7 @@ setMethod("array_forall",
signature(x = "characterOrColumn", f = "function"),
function(x, f) {
invoke_higher_order_function(
"ArrayForAll",
"forall",
cols = list(x),
funs = list(f)
)
Expand Down Expand Up @@ -4291,7 +4276,7 @@ setMethod("array_sort",
column(callJStatic("org.apache.spark.sql.functions", "array_sort", x@jc))
} else {
invoke_higher_order_function(
"ArraySort",
"array_sort",
cols = list(x),
funs = list(comparator)
)
Expand All @@ -4309,7 +4294,7 @@ setMethod("array_transform",
signature(x = "characterOrColumn", f = "function"),
function(x, f) {
invoke_higher_order_function(
"ArrayTransform",
"transform",
cols = list(x),
funs = list(f)
)
Expand Down Expand Up @@ -4374,7 +4359,7 @@ setMethod("arrays_zip_with",
signature(x = "characterOrColumn", y = "characterOrColumn", f = "function"),
function(x, y, f) {
invoke_higher_order_function(
"ZipWith",
"zip_with",
cols = list(x, y),
funs = list(f)
)
Expand Down Expand Up @@ -4447,7 +4432,7 @@ setMethod("map_filter",
signature(x = "characterOrColumn", f = "function"),
function(x, f) {
invoke_higher_order_function(
"MapFilter",
"map_filter",
cols = list(x),
funs = list(f))
})
Expand Down Expand Up @@ -4504,7 +4489,7 @@ setMethod("transform_keys",
signature(x = "characterOrColumn", f = "function"),
function(x, f) {
invoke_higher_order_function(
"TransformKeys",
"transform_keys",
cols = list(x),
funs = list(f)
)
Expand All @@ -4521,7 +4506,7 @@ setMethod("transform_values",
signature(x = "characterOrColumn", f = "function"),
function(x, f) {
invoke_higher_order_function(
"TransformValues",
"transform_values",
cols = list(x),
funs = list(f)
)
Expand Down Expand Up @@ -4552,7 +4537,7 @@ setMethod("map_zip_with",
signature(x = "characterOrColumn", y = "characterOrColumn", f = "function"),
function(x, y, f) {
invoke_higher_order_function(
"MapZipWith",
"map_zip_with",
cols = list(x, y),
funs = list(f)
)
Expand Down
3 changes: 1 addition & 2 deletions R/pkg/tests/fulltests/test_streaming.R
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ test_that("Unsupported operation", {
# memory sink without aggregation
df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
expect_error(write.stream(df, "memory", queryName = "people", outputMode = "complete"),
paste0(".*(start : analysis error - Complete output mode not supported when there ",
"are no streaming aggregations on streaming DataFrames/Datasets).*"))
".*analysis error.*complete.*not supported.*no streaming aggregations*")
})

test_that("Terminated by error", {
Expand Down
Loading

0 comments on commit fbf3be9

Please sign in to comment.