Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SKIPME merged Apache branch-1.6 #126

Merged
merged 31 commits into from
Dec 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c54b698
[SPARK-12106][STREAMING][FLAKY-TEST] BatchedWAL test transiently flak…
brkyvz Dec 7, 2015
22781c2
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Dec 7, 2015
3f230f7
[SPARK-12034][SPARKR] Eliminate warnings in SparkR test cases.
Dec 7, 2015
fed4538
[SPARK-12132] [PYSPARK] raise KeyboardInterrupt inside SIGINT handler
Dec 7, 2015
539914f
[SPARK-11932][STREAMING] Partition previous TrackStateRDD if partitio…
tdas Dec 7, 2015
c8aa5f2
[SPARK-11963][DOC] Add docs for QuantileDiscretizer
yinxusen Dec 7, 2015
cdeb89b
[SPARK-12184][PYTHON] Make python api doc for pivot consistant with s…
aray Dec 7, 2015
115bfbd
[SPARK-12160][MLLIB] Use SQLContext.getOrCreate in MLlib
jkbradley Dec 8, 2015
3c683ed
[SPARK-11551][DOC][EXAMPLE] Replace example code in ml-features.md us…
somideshmukh Dec 8, 2015
8652fc0
[SPARK-10259][ML] Add @since annotation to ml.classification
Dec 8, 2015
5c82169
[SPARK-11958][SPARK-11957][ML][DOC] SQLTransformer user guide and exa…
yanboliang Dec 8, 2015
c9e5274
[SPARK-12103][STREAMING][KAFKA][DOC] document that K means Key and V …
koeninger Dec 8, 2015
870f435
[SPARK-12166][TEST] Unset hadoop related environment in testing
zjffdu Dec 8, 2015
8a791a3
[SPARK-11551][DOC][EXAMPLE] Revert PR #10002
liancheng Dec 8, 2015
c8f9eb7
[SPARK-11652][CORE] Remote code execution with InvokerTransformer
srowen Dec 8, 2015
e3938c3
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Dec 8, 2015
8ef33aa
[SPARK-12201][SQL] add type coercion rule for greatest/least
cloud-fan Dec 8, 2015
9eeb0f2
[SPARK-12195][SQL] Adding BigDecimal, Date and Timestamp into Encoder
gatorsmile Dec 8, 2015
be0fe9b
[SPARK-12188][SQL] Code refactoring and comment correction in Dataset…
gatorsmile Dec 8, 2015
1c8451b
[SPARK-10393] use ML pipeline in LDA example
hhbyyh Dec 8, 2015
9145bfb
[SPARK-12205][SQL] Pivot fails Analysis when aggregate is UnresolvedF…
aray Dec 8, 2015
7e45feb
[SPARK-11605][MLLIB] ML 1.6 QA: API: Java compatibility, docs
hhbyyh Dec 8, 2015
3e31e7e
[SPARK-12159][ML] Add user guide section for IndexToString transformer
BenFradet Dec 8, 2015
25249d1
[SPARK-12187] *MemoryPool classes should not be fully public
Dec 8, 2015
2a5e4d1
[SPARK-12069][SQL] Update documentation with Datasets
marmbrus Dec 8, 2015
b1d5a78
[SPARK-8517][ML][DOC] Reorganizes the spark.ml user guide
thunterdb Dec 9, 2015
9e82273
[SPARK-11343][ML] Documentation of float and double prediction/label …
dahlem Dec 9, 2015
0be792a
[SPARK-12222] [CORE] Deserialize RoaringBitmap using Kryo serializer …
scwf Dec 9, 2015
b5a76b4
[SPARK-12031][CORE][BUG] Integer overflow when do sampling
uncleGen Dec 9, 2015
acd4624
[SPARK-10299][ML] word2vec should allow users to specify the window size
holdenk Dec 9, 2015
67998a2
Merge branch 'branch-1.6' of github.com:apache/spark into csd-1.6
markhamstra Dec 9, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ runScript <- function() {
sparkHome <- Sys.getenv("SPARK_HOME")
sparkTestJarPath <- "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar"
jarPath <- paste("--jars", shQuote(file.path(sparkHome, sparkTestJarPath)))
scriptPath <- file.path(sparkHome, "R/lib/SparkR/tests/jarTest.R")
scriptPath <- file.path(sparkHome, "R/lib/SparkR/tests/testthat/jarTest.R")
submitPath <- file.path(sparkHome, "bin/spark-submit")
res <- system2(command = submitPath,
args = c(jarPath, scriptPath),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)

test_that("glm and predict", {
training <- createDataFrame(sqlContext, iris)
training <- suppressWarnings(createDataFrame(sqlContext, iris))
test <- select(training, "Sepal_Length")
model <- glm(Sepal_Width ~ Sepal_Length, training, family = "gaussian")
prediction <- predict(model, test)
Expand All @@ -39,7 +39,7 @@ test_that("glm and predict", {
})

test_that("glm should work with long formula", {
training <- createDataFrame(sqlContext, iris)
training <- suppressWarnings(createDataFrame(sqlContext, iris))
training$LongLongLongLongLongName <- training$Sepal_Width
training$VeryLongLongLongLonLongName <- training$Sepal_Length
training$AnotherLongLongLongLongName <- training$Species
Expand All @@ -51,31 +51,31 @@ test_that("glm should work with long formula", {
})

test_that("predictions match with native glm", {
training <- createDataFrame(sqlContext, iris)
training <- suppressWarnings(createDataFrame(sqlContext, iris))
model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
vals <- collect(select(predict(model, training), "prediction"))
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
})

test_that("dot minus and intercept vs native glm", {
training <- createDataFrame(sqlContext, iris)
training <- suppressWarnings(createDataFrame(sqlContext, iris))
model <- glm(Sepal_Width ~ . - Species + 0, data = training)
vals <- collect(select(predict(model, training), "prediction"))
rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
})

test_that("feature interaction vs native glm", {
training <- createDataFrame(sqlContext, iris)
training <- suppressWarnings(createDataFrame(sqlContext, iris))
model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training)
vals <- collect(select(predict(model, training), "prediction"))
rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
})

test_that("summary coefficients match with native glm", {
training <- createDataFrame(sqlContext, iris)
training <- suppressWarnings(createDataFrame(sqlContext, iris))
stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training, solver = "normal"))
coefs <- unlist(stats$coefficients)
devianceResiduals <- unlist(stats$devianceResiduals)
Expand All @@ -92,7 +92,7 @@ test_that("summary coefficients match with native glm", {
})

test_that("summary coefficients match with native glm of family 'binomial'", {
df <- createDataFrame(sqlContext, iris)
df <- suppressWarnings(createDataFrame(sqlContext, iris))
training <- filter(df, df$Species != "setosa")
stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training,
family = "binomial"))
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -133,38 +133,45 @@ test_that("create DataFrame from RDD", {
expect_equal(columns(df), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))

df <- jsonFile(sqlContext, jsonPathNa)
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
},
error = function(err) {
skip("Hive is not build with SparkSQL, skipped")
})
sql(hiveCtx, "CREATE TABLE people (name string, age double, height float)")
insertInto(df, "people")
expect_equal(sql(hiveCtx, "SELECT age from people WHERE name = 'Bob'"), c(16))
expect_equal(sql(hiveCtx, "SELECT height from people WHERE name ='Bob'"), c(176.5))

schema <- structType(structField("name", "string"), structField("age", "integer"),
structField("height", "float"))
df2 <- createDataFrame(sqlContext, df.toRDD, schema)
df2AsDF <- as.DataFrame(sqlContext, df.toRDD, schema)
df <- read.df(sqlContext, jsonPathNa, "json", schema)
df2 <- createDataFrame(sqlContext, toRDD(df), schema)
df2AsDF <- as.DataFrame(sqlContext, toRDD(df), schema)
expect_equal(columns(df2), c("name", "age", "height"))
expect_equal(columns(df2AsDF), c("name", "age", "height"))
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(dtypes(df2AsDF), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(collect(where(df2, df2$name == "Bob")), c("Bob", 16, 176.5))
expect_equal(collect(where(df2AsDF, df2$name == "Bob")), c("Bob", 16, 176.5))
expect_equal(as.list(collect(where(df2, df2$name == "Bob"))),
list(name = "Bob", age = 16, height = 176.5))
expect_equal(as.list(collect(where(df2AsDF, df2AsDF$name == "Bob"))),
list(name = "Bob", age = 16, height = 176.5))

localDF <- data.frame(name=c("John", "Smith", "Sarah"),
age=c(19, 23, 18),
height=c(164.10, 181.4, 173.7))
age=c(19L, 23L, 18L),
height=c(176.5, 181.4, 173.7))
df <- createDataFrame(sqlContext, localDF, schema)
expect_is(df, "DataFrame")
expect_equal(count(df), 3)
expect_equal(columns(df), c("name", "age", "height"))
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(collect(where(df, df$name == "John")), c("John", 19, 164.10))
expect_equal(as.list(collect(where(df, df$name == "John"))),
list(name = "John", age = 19L, height = 176.5))

ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
},
error = function(err) {
skip("Hive is not build with SparkSQL, skipped")
})
sql(hiveCtx, "CREATE TABLE people (name string, age double, height float)")
df <- read.df(hiveCtx, jsonPathNa, "json", schema)
invisible(insertInto(df, "people"))
expect_equal(collect(sql(hiveCtx, "SELECT age from people WHERE name = 'Bob'"))$age,
c(16))
expect_equal(collect(sql(hiveCtx, "SELECT height from people WHERE name ='Bob'"))$height,
c(176.5))
})

test_that("convert NAs to null type in DataFrames", {
Expand Down Expand Up @@ -250,7 +257,7 @@ test_that("create DataFrame from list or data.frame", {
ldf2 <- collect(df)
expect_equal(ldf$a, ldf2$a)

irisdf <- createDataFrame(sqlContext, iris)
irisdf <- suppressWarnings(createDataFrame(sqlContext, iris))
iris_collected <- collect(irisdf)
expect_equivalent(iris_collected[,-5], iris[,-5])
expect_equal(iris_collected$Species, as.character(iris$Species))
Expand Down Expand Up @@ -463,7 +470,7 @@ test_that("union on two RDDs created from DataFrames returns an RRDD", {
RDD2 <- toRDD(df)
unioned <- unionRDD(RDD1, RDD2)
expect_is(unioned, "RDD")
expect_equal(SparkR:::getSerializedMode(unioned), "byte")
expect_equal(getSerializedMode(unioned), "byte")
expect_equal(collect(unioned)[[2]]$name, "Andy")
})

Expand All @@ -485,13 +492,13 @@ test_that("union on mixed serialization types correctly returns a byte RRDD", {

unionByte <- unionRDD(rdd, dfRDD)
expect_is(unionByte, "RDD")
expect_equal(SparkR:::getSerializedMode(unionByte), "byte")
expect_equal(getSerializedMode(unionByte), "byte")
expect_equal(collect(unionByte)[[1]], 1)
expect_equal(collect(unionByte)[[12]]$name, "Andy")

unionString <- unionRDD(textRDD, dfRDD)
expect_is(unionString, "RDD")
expect_equal(SparkR:::getSerializedMode(unionString), "byte")
expect_equal(getSerializedMode(unionString), "byte")
expect_equal(collect(unionString)[[1]], "Michael")
expect_equal(collect(unionString)[[5]]$name, "Andy")
})
Expand All @@ -504,7 +511,7 @@ test_that("objectFile() works with row serialization", {
objectIn <- objectFile(sc, objectPath)

expect_is(objectIn, "RDD")
expect_equal(SparkR:::getSerializedMode(objectIn), "byte")
expect_equal(getSerializedMode(objectIn), "byte")
expect_equal(collect(objectIn)[[2]]$age, 30)
})

Expand Down Expand Up @@ -849,6 +856,7 @@ test_that("write.df() as parquet file", {
})

test_that("test HiveContext", {
ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
},
Expand All @@ -863,10 +871,10 @@ test_that("test HiveContext", {
expect_equal(count(df2), 3)

jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
saveAsTable(df, "json", "json", "append", path = jsonPath2)
df3 <- sql(hiveCtx, "select * from json")
invisible(saveAsTable(df, "json2", "json", "append", path = jsonPath2))
df3 <- sql(hiveCtx, "select * from json2")
expect_is(df3, "DataFrame")
expect_equal(count(df3), 6)
expect_equal(count(df3), 3)
})

test_that("column operators", {
Expand Down Expand Up @@ -1311,7 +1319,7 @@ test_that("toJSON() returns an RDD of the correct values", {
df <- jsonFile(sqlContext, jsonPath)
testRDD <- toJSON(df)
expect_is(testRDD, "RDD")
expect_equal(SparkR:::getSerializedMode(testRDD), "string")
expect_equal(getSerializedMode(testRDD), "string")
expect_equal(collect(testRDD)[[1]], mockLines[1])
})

Expand Down Expand Up @@ -1641,7 +1649,7 @@ test_that("SQL error message is returned from JVM", {
expect_equal(grepl("Table not found: blah", retError), TRUE)
})

irisDF <- createDataFrame(sqlContext, iris)
irisDF <- suppressWarnings(createDataFrame(sqlContext, iris))

test_that("Method as.data.frame as a synonym for collect()", {
expect_equal(as.data.frame(irisDF), collect(irisDF))
Expand Down Expand Up @@ -1670,7 +1678,7 @@ test_that("attach() on a DataFrame", {
})

test_that("with() on a DataFrame", {
df <- createDataFrame(sqlContext, iris)
df <- suppressWarnings(createDataFrame(sqlContext, iris))
expect_error(Sepal_Length)
sum1 <- with(df, list(summary(Sepal_Length), summary(Sepal_Width)))
expect_equal(collect(sum1[[1]])[1, "Sepal_Length"], "150")
Expand Down
File renamed without changes.
File renamed without changes.
3 changes: 3 additions & 0 deletions R/pkg/tests/run-all.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@
library(testthat)
library(SparkR)

# Turn all warnings into errors
options("warn" = 2)

test_package("SparkR")
2 changes: 1 addition & 1 deletion R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE

SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 $FWDIR/../bin/sparkR --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.default.name="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))

if [[ $FAILED != 0 ]]; then
Expand Down
6 changes: 6 additions & 0 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ fi

export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"

# For tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi

# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private[spark] object RangePartitioner {
*/
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
Expand All @@ -262,7 +262,7 @@ private[spark] object RangePartitioner {
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2.toLong).sum
val numItems = sketched.map(_._2).sum
(numItems, sketched)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.Logging
* @param lock a [[MemoryManager]] instance to synchronize on
* @param poolName a human-readable name for this pool, for use in log messages
*/
class ExecutionMemoryPool(
private[memory] class ExecutionMemoryPool(
lock: Object,
poolName: String
) extends MemoryPool(lock) with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import javax.annotation.concurrent.GuardedBy
* to `Object` to avoid programming errors, since this object should only be used for
* synchronization purposes.
*/
abstract class MemoryPool(lock: Object) {
private[memory] abstract class MemoryPool(lock: Object) {

@GuardedBy("lock")
private[this] var _poolSize: Long = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.storage.{MemoryStore, BlockStatus, BlockId}
*
* @param lock a [[MemoryManager]] instance to synchronize on
*/
class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {

@GuardedBy("lock")
private[this] var _memoryUsed: Long = 0L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId}
private[spark] class UnifiedMemoryManager private[memory] (
conf: SparkConf,
val maxMemory: Long,
private val storageRegionSize: Long,
storageRegionSize: Long,
numCores: Int)
extends MemoryManager(
conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,15 @@ private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends Dat
override def readUTF(): String = input.readString() // readString in kryo does utf8
override def readInt(): Int = input.readInt()
override def readUnsignedShort(): Int = input.readShortUnsigned()
override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt
override def skipBytes(n: Int): Int = {
var remaining: Long = n
while (remaining > 0) {
val skip = Math.min(Integer.MAX_VALUE, remaining).asInstanceOf[Int]
input.skip(skip)
remaining -= skip
}
n
}
override def readFully(b: Array[Byte]): Unit = input.read(b)
override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len)
override def readLine(): String = throw new UnsupportedOperationException("readLine")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[spark] object SamplingUtils {
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Int) = {
: (Array[T], Long) = {
val reservoir = new Array[T](k)
// Put the first k elements in the reservoir.
var i = 0
Expand All @@ -52,16 +52,17 @@ private[spark] object SamplingUtils {
(trimReservoir, i)
} else {
// If input size > k, continue the sampling process.
var l = i.toLong
val rand = new XORShiftRandom(seed)
while (input.hasNext) {
val item = input.next()
val replacementIndex = rand.nextInt(i)
val replacementIndex = (rand.nextDouble() * l).toLong
if (replacementIndex < k) {
reservoir(replacementIndex) = item
reservoir(replacementIndex.toInt) = item
}
i += 1
l += 1
}
(reservoir, i)
(reservoir, l)
}
}

Expand Down
Loading