Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17838][SparkR] Check named arguments for options and use formatted R friendly message from JVM exception message #15608

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ setMethod("write.json",
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "json", path))
invisible(handledCallJMethod(write, "json", path))
})

#' Save the contents of SparkDataFrame as an ORC file, preserving the schema.
Expand Down Expand Up @@ -792,7 +792,7 @@ setMethod("write.orc",
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "orc", path))
invisible(handledCallJMethod(write, "orc", path))
})

#' Save the contents of SparkDataFrame as a Parquet file, preserving the schema.
Expand Down Expand Up @@ -824,7 +824,7 @@ setMethod("write.parquet",
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "parquet", path))
invisible(handledCallJMethod(write, "parquet", path))
})

#' @rdname write.parquet
Expand Down Expand Up @@ -868,7 +868,7 @@ setMethod("write.text",
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "text", path))
invisible(handledCallJMethod(write, "text", path))
})

#' Distinct
Expand Down Expand Up @@ -3315,7 +3315,7 @@ setMethod("write.jdbc",
jprops <- varargsToJProperties(...)
write <- callJMethod(x@sdf, "write")
write <- callJMethod(write, "mode", jmode)
invisible(callJMethod(write, "jdbc", url, tableName, jprops))
invisible(handledCallJMethod(write, "jdbc", url, tableName, jprops))
})

#' randomSplit
Expand Down
17 changes: 9 additions & 8 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ read.json.default <- function(path, ...) {
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "json", paths)
sdf <- handledCallJMethod(read, "json", paths)
dataFrame(sdf)
}

Expand Down Expand Up @@ -421,7 +421,7 @@ read.orc <- function(path, ...) {
path <- suppressWarnings(normalizePath(path))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "orc", path)
sdf <- handledCallJMethod(read, "orc", path)
dataFrame(sdf)
}

Expand All @@ -443,7 +443,7 @@ read.parquet.default <- function(path, ...) {
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "parquet", paths)
sdf <- handledCallJMethod(read, "parquet", paths)
dataFrame(sdf)
}

Expand Down Expand Up @@ -495,7 +495,7 @@ read.text.default <- function(path, ...) {
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "text", paths)
sdf <- handledCallJMethod(read, "text", paths)
dataFrame(sdf)
}

Expand Down Expand Up @@ -913,12 +913,13 @@ read.jdbc <- function(url, tableName,
} else {
numPartitions <- numToInt(numPartitions)
}
sdf <- callJMethod(read, "jdbc", url, tableName, as.character(partitionColumn),
numToInt(lowerBound), numToInt(upperBound), numPartitions, jprops)
sdf <- handledCallJMethod(read, "jdbc", url, tableName, as.character(partitionColumn),
numToInt(lowerBound), numToInt(upperBound), numPartitions, jprops)
} else if (length(predicates) > 0) {
sdf <- callJMethod(read, "jdbc", url, tableName, as.list(as.character(predicates)), jprops)
sdf <- handledCallJMethod(read, "jdbc", url, tableName, as.list(as.character(predicates)),
jprops)
} else {
sdf <- callJMethod(read, "jdbc", url, tableName, jprops)
sdf <- handledCallJMethod(read, "jdbc", url, tableName, jprops)
}
dataFrame(sdf)
}
44 changes: 32 additions & 12 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -339,20 +339,40 @@ varargsToEnv <- function(...) {
varargsToStrEnv <- function(...) {
pairs <- list(...)
env <- new.env()
for (name in names(pairs)) {
value <- pairs[[name]]
if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) {
stop(paste0("Unsupported type for ", name, " : ", class(value),
". Supported types are logical, numeric, character and NULL."))
}
if (is.logical(value)) {
env[[name]] <- tolower(as.character(value))
} else if (is.null(value)) {
env[[name]] <- value
} else {
env[[name]] <- as.character(value)
nameList <- names(pairs)
ignoredNames <- list()
i <- 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is i needed here?
could you elaborate on changes in this file? why do we need them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sure. Actually, it took me a while to find out a clean way but it ended up with introducing another variable here...

value <- pairs[[name]]

is failed when name is empty string (when they are non-named arguments), producing the error as below:

Error in env[[name]] <- value :
  zero-length variable name

I wanted to access to that problematic values to print in

ignoredNames <- append(ignoredNames, pairs[i])
...
warning(paste0("Non-named arguments ignored: ", paste(ignoredNames, collapse = ", "), "."),
         call. = FALSE)

To cut it short, I introduced that variable to access to the value when name is empty. Maybe there should be another cleaver way but I couldn't come up with it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, one possible alternative is to turn the list into a sequence of its indices but use lapply instead of for (which is more R-like)

lapply(seq_along(x), function(i) paste(names(x)[[i]], x[[i]]))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks nicer. Let me try that!


if (is.null(nameList)) {
# When all arguments are not named, names(..) returns NULL.
ignoredNames <- pairs
} else {
for (name in names(pairs)) {
if (identical(name, "")) {
# When some of arguments are not named, name is "".
ignoredNames <- append(ignoredNames, pairs[i])
} else {
value <- pairs[[name]]
if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) {
stop(paste0("Unsupported type for ", name, " : ", class(value),
". Supported types are logical, numeric, character and NULL."), call. = FALSE)
}
if (is.logical(value)) {
env[[name]] <- tolower(as.character(value))
} else if (is.null(value)) {
env[[name]] <- value
} else {
env[[name]] <- as.character(value)
}
}
i <- i + 1
}
}

if (length(ignoredNames) != 0) {
warning(paste0("Non-named arguments ignored: ", paste(ignoredNames, collapse = ", "), "."),
call. = FALSE)
}
env
}

Expand Down
16 changes: 16 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2657,6 +2657,14 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume
# DataFrameWriter.save() without path.
expect_error(write.df(df, source = "csv"),
"Error in save : illegal argument - 'path' is not specified")
expect_error(write.json(df, jsonPath),
"Error in json : analysis error - path file:.*already exists")
expect_error(write.text(df, jsonPath),
"Error in text : analysis error - path file:.*already exists")
expect_error(write.orc(df, jsonPath),
"Error in orc : analysis error - path file:.*already exists")
expect_error(write.parquet(df, jsonPath),
"Error in parquet : analysis error - path file:.*already exists")

# Arguments checking in R side.
expect_error(write.df(df, "data.tmp", source = c(1, 2)),
Expand All @@ -2676,13 +2684,21 @@ test_that("Call DataFrameWriter.load() API in Java without path and check argume
paste("Error in loadDF : analysis error - Unable to infer schema for JSON at .",
"It must be specified manually"))
expect_error(read.df("arbitrary_path"), "Error in loadDF : analysis error - Path does not exist")
expect_error(read.json("arbitrary_path"), "Error in json : analysis error - Path does not exist")
expect_error(read.text("arbitrary_path"), "Error in text : analysis error - Path does not exist")
expect_error(read.orc("arbitrary_path"), "Error in orc : analysis error - Path does not exist")
expect_error(read.parquet("arbitrary_path"),
"Error in parquet : analysis error - Path does not exist")

# Arguments checking in R side.
expect_error(read.df(path = c(3)),
"path should be charactor, NULL or omitted.")
expect_error(read.df(jsonPath, source = c(1, 2)),
paste("source should be character, NULL or omitted. It is the datasource specified",
"in 'spark.sql.sources.default' configuration by default."))

expect_warning(read.json(jsonPath, a = 1, 2, 3, "a"),
"Non-named arguments ignored: 2, 3, a.")
})

unlink(parquetPath)
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/inst/tests/testthat/test_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ test_that("varargsToStrEnv", {
expect_error(varargsToStrEnv(a = list(1, "a")),
paste0("Unsupported type for a : list. Supported types are logical, ",
"numeric, character and NULL."))
expect_warning(varargsToStrEnv(a = 1, 2, 3, 4), "Non-named arguments ignored: 2, 3, 4.")
expect_warning(varargsToStrEnv(1, 2, 3, 4), "Non-named arguments ignored: 1, 2, 3, 4.")
})

sparkR.session.stop()