Skip to content

Commit

Permalink
Merge pull request apache#126 from sun-rui/SPARKR-147
Browse files Browse the repository at this point in the history
[SPARKR-147] Support multiple directories as input to textFile.
  • Loading branch information
shivaram committed Dec 25, 2014
2 parents 4d4fc30 + f04c6e0 commit cb6873e
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 15 deletions.
30 changes: 17 additions & 13 deletions pkg/R/context.R
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
# context.R: SparkContext driven functions

getMinSplits <- function(sc, minSplits) {
if (is.null(minSplits)) {
ssc <- .jcall(sc, "Lorg/apache/spark/SparkContext;", "sc")
defaultParallelism <- .jcall(ssc, "I", "defaultParallelism")
minSplits <- min(defaultParallelism, 2)
}
as.integer(minSplits)
}

#' Create an RDD from a text file.
#'
#' This function reads a text file from HDFS, a local file system (available on all
#' nodes), or any Hadoop-supported file system URI, and creates an
#' RDD of strings from it.
#'
#' @param sc SparkContext to use
#' @param path Path of file to read
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minSplits Minimum number of splits to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD where each item is of type \code{character}
Expand All @@ -17,17 +26,10 @@
#' sc <- sparkR.init()
#' lines <- textFile(sc, "myfile.txt")
#'}

getMinSplits <- function(sc, minSplits) {
if (is.null(minSplits)) {
ssc <- .jcall(sc, "Lorg/apache/spark/SparkContext;", "sc")
defaultParallelism <- .jcall(ssc, "I", "defaultParallelism")
minSplits <- min(defaultParallelism, 2)
}
as.integer(minSplits)
}

textFile <- function(sc, path, minSplits = NULL) {
#' Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse=",")

jrdd <- .jcall(sc, "Lorg/apache/spark/api/java/JavaRDD;", "textFile", path,
getMinSplits(sc, minSplits))
RDD(jrdd, FALSE)
Expand All @@ -39,7 +41,7 @@ textFile <- function(sc, path, minSplits = NULL) {
#' saveAsObjectFile() of the RDD class.
#'
#' @param sc SparkContext to use
#' @param path Path of file to read
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minSplits Minimum number of splits to be created. If NULL, the default
#' value is chosen based on available parallelism.
#' @return RDD containing serialized R objects.
Expand All @@ -50,8 +52,10 @@ textFile <- function(sc, path, minSplits = NULL) {
#' sc <- sparkR.init()
#' rdd <- objectFile(sc, "myfile")
#'}

objectFile <- function(sc, path, minSplits = NULL) {
#' Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse=",")

jrdd <- .jcall(sc, "Lorg/apache/spark/api/java/JavaRDD;", "objectFile", path,
getMinSplits(sc, minSplits))
# Assume the RDD contains serialized R objects.
Expand Down
16 changes: 16 additions & 0 deletions pkg/inst/tests/test_binaryFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,19 @@ test_that("saveAsObjectFile()/objectFile() following RDD transformations works",
unlink(fileName2, recursive = TRUE)
})

test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")

rdd1 <- parallelize(sc, "Spark is pretty.")
saveAsObjectFile(rdd1, fileName1)
rdd2 <- parallelize(sc, "Spark is awesome.")
saveAsObjectFile(rdd2, fileName2)

rdd <- objectFile(sc, c(fileName1, fileName2))
expect_true(count(rdd) == 2)

unlink(fileName1, recursive = TRUE)
unlink(fileName2, recursive = TRUE)
})

14 changes: 14 additions & 0 deletions pkg/inst/tests/test_textFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,17 @@ test_that("textFile() and saveAsTextFile() word count works as expected", {
unlink(fileName1)
unlink(fileName2)
})

test_that("textFile() on multiple paths", {
fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines("Spark is pretty.", fileName1)
writeLines("Spark is awesome.", fileName2)

rdd <- textFile(sc, c(fileName1, fileName2))
expect_true(count(rdd) == 2)

unlink(fileName1)
unlink(fileName2)
})

2 changes: 1 addition & 1 deletion pkg/man/objectFile.Rd
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ objectFile(sc, path, minSplits = NULL)
\arguments{
\item{sc}{SparkContext to use}

\item{path}{Path of file to read}
\item{path}{Path of file to read. A vector of multiple paths is allowed.}

\item{minSplits}{Minimum number of splits to be created. If NULL, the default
value is chosen based on available parallelism.}
Expand Down
2 changes: 1 addition & 1 deletion pkg/man/textFile.Rd
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ textFile(sc, path, minSplits = NULL)
\arguments{
\item{sc}{SparkContext to use}

\item{path}{Path of file to read}
\item{path}{Path of file to read. A vector of multiple paths is allowed.}

\item{minSplits}{Minimum number of splits to be created.
If NULL, the default value is chosen based on available
Expand Down

0 comments on commit cb6873e

Please sign in to comment.