Skip to content

Commit

Permalink
Merge with upstream master.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sun Rui committed Jan 30, 2015
2 parents cd2a5b3 + 52356b6 commit 2814caa
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 22 deletions.
48 changes: 35 additions & 13 deletions pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,42 @@ sparkR.onLoad <- function(libname, pkgname) {
.sparkREnv$assemblyJarPath <- assemblyJarPath
}

#' Stop this Spark context
#' Also terminates the backend this R session is connected to
sparkR.stop <- function(sparkREnv) {
if (exists(".sparkRjsc", envir = .sparkREnv)) {
sc <- get(".sparkRjsc", envir = .sparkREnv)
callJMethod(sc, "stop")
# Utility function that returns TRUE if we have an active connection to the
# backend and FALSE otherwise
connExists <- function(env) {
tryCatch({
exists(".sparkRCon", envir = env) && isOpen(env[[".sparkRCon"]])
}, error = function(err) {
return(FALSE)
})
}

# Stop the Spark context.
# Also terminates the backend this R session is connected to
sparkR.stop <- function(env) {
cat("Stopping SparkR\n")

if (!connExists(env)) {
# When the workspace is saved in R, the connections are closed
# *before* the finalizer is run. In these cases, we reconnect
# to the backend, so we can shut it down.
connectBackend("localhost", .sparkREnv$sparkRBackendPort)
}

if (exists(".sparkRCon", envir = .sparkREnv)) {
callJStatic("SparkRHandler", "stopBackend")
# Also close the connection and remove it from our env
conn <- get(".sparkRCon", .sparkREnv)
close(conn)
rm(".sparkRCon", envir = .sparkREnv)
if (exists(".sparkRjsc", envir = env)) {
sc <- get(".sparkRjsc", envir = env)
callJMethod(sc, "stop")
}

callJStatic("SparkRHandler", "stopBackend")
# Also close the connection and remove it from our env
conn <- get(".sparkRCon", env)
close(conn)
rm(".sparkRCon", envir = env)

# Finally, sleep for 1 sec to let backend finish exiting.
# Without this we get port conflicts in RStudio when we try to 'Restart R'.
Sys.sleep(1)
}

#' Initialize a new Spark Context.
Expand Down Expand Up @@ -79,7 +100,8 @@ sparkR.init <- function(
args = as.character(sparkRBackendPort),
javaOpts = paste("-Xmx", sparkMem, sep = ""))
Sys.sleep(2) # Wait for backend to come up
connectBackend("localhost", 12345) # Connect to it
.sparkREnv$sparkRBackendPort <- sparkRBackendPort
connectBackend("localhost", sparkRBackendPort) # Connect to it

if (nchar(sparkHome) != 0) {
sparkHome <- normalizePath(sparkHome)
Expand Down
3 changes: 2 additions & 1 deletion pkg/R/sparkRBackend.R
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ isRemoveMethod <- function(isStatic, objId, methodName) {
# methodName - name of method to be invoked
invokeJava <- function(isStatic, objId, methodName, ...) {
if (!exists(".sparkRCon", .sparkREnv)) {
stop("No connection to backend found")
stop("No connection to backend found. Please re-run sparkR.init")
}

# If this isn't a removeJObject call
Expand All @@ -62,6 +62,7 @@ invokeJava <- function(isStatic, objId, methodName, ...) {
}
}


rc <- rawConnection(raw(0), "r+")

writeBoolean(rc, isStatic)
Expand Down
14 changes: 6 additions & 8 deletions pkg/inst/tests/test_parallelize_collect.R
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,12 @@ test_that("collect(), following a parallelize(), gives back the original collect
})

test_that("regression: collect() following a parallelize() does not drop elements", {
lapply(1:72,
function(collLen) {
lapply(1:15, function(numPart) {
expected <- runif(collLen)
actual <- collect(parallelize(jsc, expected, numPart))
expect_equal(actual, as.list(expected))
})
})
# 10 %/% 6 = 1, ceiling(10 / 6) = 2
collLen <- 10
numPart <- 6
expected <- runif(collLen)
actual <- collect(parallelize(jsc, expected, numPart))
expect_equal(actual, as.list(expected))
})

test_that("parallelize() and collect() work for lists of pairs (pairwise data)", {
Expand Down

0 comments on commit 2814caa

Please sign in to comment.