Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into SPARK-23179
Browse files Browse the repository at this point in the history
  • Loading branch information
mgaido91 committed Feb 7, 2018
2 parents 2c8e2c7 + c36fecc commit bd8b645
Show file tree
Hide file tree
Showing 403 changed files with 9,413 additions and 2,922 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ project/plugins/src_managed/
project/plugins/target/
python/lib/pyspark.zip
python/deps
python/test_coverage/coverage_data
python/test_coverage/htmlcov
python/pyspark/python
reports/
scalastyle-on-compile.generated.xml
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2090,7 +2090,8 @@ setMethod("selectExpr",
#'
#' @param x a SparkDataFrame.
#' @param colName a column name.
#' @param col a Column expression, or an atomic vector in the length of 1 as literal value.
#' @param col a Column expression (which must refer only to this SparkDataFrame), or an atomic
#' vector in the length of 1 as literal value.
#' @return A SparkDataFrame with the new column added or the existing column replaced.
#' @family SparkDataFrame functions
#' @aliases withColumn,SparkDataFrame,character-method
Expand Down
4 changes: 3 additions & 1 deletion R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,9 @@ setMethod("last_day",
})

#' @details
#' \code{length}: Computes the length of a given string or binary column.
#' \code{length}: Computes the character length of a string data or number of bytes
#' of a binary data. The length of string data includes the trailing spaces.
#' The length of binary data includes binary zeros.
#'
#' @rdname column_string_functions
#' @aliases length length,Column-method
Expand Down
15 changes: 14 additions & 1 deletion R/pkg/R/mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,24 @@ function(object, path, overwrite = FALSE) {
#' savedModel <- read.ml(path)
#' summary(savedModel)
#'
#' # multinomial logistic regression
#' # binary logistic regression against two classes with
#' # upperBoundsOnCoefficients and upperBoundsOnIntercepts
#' ubc <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4)
#' model <- spark.logit(training, Species ~ .,
#' upperBoundsOnCoefficients = ubc,
#' upperBoundsOnIntercepts = 1.0)
#'
#' # multinomial logistic regression
#' model <- spark.logit(training, Class ~ ., regParam = 0.5)
#' summary <- summary(model)
#'
#' # multinomial logistic regression with
#' # lowerBoundsOnCoefficients and lowerBoundsOnIntercepts
#' lbc <- matrix(c(0.0, -1.0, 0.0, -1.0, 0.0, -1.0, 0.0, -1.0), nrow = 2, ncol = 4)
#' lbi <- as.array(c(0.0, 0.0))
#' model <- spark.logit(training, Species ~ ., family = "multinomial",
#' lowerBoundsOnCoefficients = lbc,
#' lowerBoundsOnIntercepts = lbi)
#' }
#' @note spark.logit since 2.1.0
setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"),
Expand Down
11 changes: 6 additions & 5 deletions R/pkg/R/serialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
# POSIXct,POSIXlt -> Time
#
# list[T] -> Array[T], where T is one of above mentioned types
# Multi-element vector of any of the above (except raw) -> Array[T]
# environment -> Map[String, T], where T is a native type
# jobj -> Object, where jobj is an object created in the backend
# nolint end

getSerdeType <- function(object) {
type <- class(object)[[1]]
if (type != "list") {
type
if (is.atomic(object) & !is.raw(object) & length(object) > 1) {
"array"
} else if (type != "list") {
type
} else {
# Check if all elements are of same type
elemType <- unique(sapply(object, function(elem) { getSerdeType(elem) }))
Expand All @@ -50,9 +53,7 @@ getSerdeType <- function(object) {
}

writeObject <- function(con, object, writeType = TRUE) {
# NOTE: In R vectors have same type as objects. So we don't support
# passing in vectors as arrays and instead require arrays to be passed
# as lists.
# NOTE: In R vectors have same type as objects
type <- class(object)[[1]] # class of POSIXlt is c("POSIXlt", "POSIXt")
# Checking types is needed here, since 'is.na' only handles atomic vectors,
# lists and pairlists
Expand Down
47 changes: 47 additions & 0 deletions R/pkg/tests/fulltests/test_Serde.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,53 @@ test_that("SerDe of primitive types", {
expect_equal(class(x), "character")
})

test_that("SerDe of multi-element primitive vectors inside R data.frame", {
# vector of integers embedded in R data.frame
indices <- 1L:3L
myDf <- data.frame(indices)
myDf$data <- list(rep(0L, 3L))
mySparkDf <- as.DataFrame(myDf)
myResultingDf <- collect(mySparkDf)
myDfListedData <- data.frame(indices)
myDfListedData$data <- list(as.list(rep(0L, 3L)))
expect_equal(myResultingDf, myDfListedData)
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "integer")

# vector of numeric embedded in R data.frame
myDf <- data.frame(indices)
myDf$data <- list(rep(0, 3L))
mySparkDf <- as.DataFrame(myDf)
myResultingDf <- collect(mySparkDf)
myDfListedData <- data.frame(indices)
myDfListedData$data <- list(as.list(rep(0, 3L)))
expect_equal(myResultingDf, myDfListedData)
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "numeric")

# vector of logical embedded in R data.frame
myDf <- data.frame(indices)
myDf$data <- list(rep(TRUE, 3L))
mySparkDf <- as.DataFrame(myDf)
myResultingDf <- collect(mySparkDf)
myDfListedData <- data.frame(indices)
myDfListedData$data <- list(as.list(rep(TRUE, 3L)))
expect_equal(myResultingDf, myDfListedData)
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "logical")

# vector of character embedded in R data.frame
myDf <- data.frame(indices)
myDf$data <- list(rep("abc", 3L))
mySparkDf <- as.DataFrame(myDf)
myResultingDf <- collect(mySparkDf)
myDfListedData <- data.frame(indices)
myDfListedData$data <- list(as.list(rep("abc", 3L)))
expect_equal(myResultingDf, myDfListedData)
expect_equal(class(myResultingDf[["data"]][[1]]), "list")
expect_equal(class(myResultingDf[["data"]][[1]][[1]]), "character")
})

test_that("SerDe of list of primitive types", {
x <- list(1L, 2L, 3L)
y <- callJStatic("SparkRHandler", "echo", x)
Expand Down
10 changes: 5 additions & 5 deletions R/pkg/tests/fulltests/test_mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ test_that("spark.logit", {
# Petal.Width 0.42122607
# nolint end

# Test multinomial logistic regression againt three classes
# Test multinomial logistic regression against three classes
df <- suppressWarnings(createDataFrame(iris))
model <- spark.logit(df, Species ~ ., regParam = 0.5)
summary <- summary(model)
Expand Down Expand Up @@ -196,7 +196,7 @@ test_that("spark.logit", {
#
# nolint end

# Test multinomial logistic regression againt two classes
# Test multinomial logistic regression against two classes
df <- suppressWarnings(createDataFrame(iris))
training <- df[df$Species %in% c("versicolor", "virginica"), ]
model <- spark.logit(training, Species ~ ., regParam = 0.5, family = "multinomial")
Expand All @@ -208,7 +208,7 @@ test_that("spark.logit", {
expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1))
expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1))

# Test binomial logistic regression againt two classes
# Test binomial logistic regression against two classes
model <- spark.logit(training, Species ~ ., regParam = 0.5)
summary <- summary(model)
coefsR <- c(-6.08, 0.25, 0.16, 0.48, 1.04)
Expand Down Expand Up @@ -239,7 +239,7 @@ test_that("spark.logit", {
prediction2 <- collect(select(predict(model2, df2), "prediction"))
expect_equal(sort(prediction2$prediction), c("0.0", "0.0", "0.0", "0.0", "0.0"))

# Test binomial logistic regression againt two classes with upperBoundsOnCoefficients
# Test binomial logistic regression against two classes with upperBoundsOnCoefficients
# and upperBoundsOnIntercepts
u <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4)
model <- spark.logit(training, Species ~ ., upperBoundsOnCoefficients = u,
Expand All @@ -252,7 +252,7 @@ test_that("spark.logit", {
expect_error(spark.logit(training, Species ~ ., upperBoundsOnCoefficients = as.array(c(1, 2)),
upperBoundsOnIntercepts = 1.0))

# Test binomial logistic regression againt two classes with lowerBoundsOnCoefficients
# Test binomial logistic regression against two classes with lowerBoundsOnCoefficients
# and lowerBoundsOnIntercepts
l <- matrix(c(0.0, -1.0, 0.0, -1.0), nrow = 1, ncol = 4)
model <- spark.logit(training, Species ~ ., lowerBoundsOnCoefficients = l,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ private class DownloadCallback implements StreamCallback {

@Override
public void onData(String streamId, ByteBuffer buf) throws IOException {
channel.write(buf);
while (buf.hasRemaining()) {
channel.write(buf);
}
}

@Override
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/spark/SparkFirehoseListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executo
onEvent(executorBlacklisted);
}

@Override
public void onExecutorBlacklistedForStage(
SparkListenerExecutorBlacklistedForStage executorBlacklistedForStage) {
onEvent(executorBlacklistedForStage);
}

@Override
public void onNodeBlacklistedForStage(
SparkListenerNodeBlacklistedForStage nodeBlacklistedForStage) {
onEvent(nodeBlacklistedForStage);
}

@Override
public final void onExecutorUnblacklisted(
SparkListenerExecutorUnblacklisted executorUnblacklisted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
currentEntry = sortedConsumers.lastEntry();
}
List<MemoryConsumer> cList = currentEntry.getValue();
MemoryConsumer c = cList.remove(cList.size() - 1);
if (cList.isEmpty()) {
sortedConsumers.remove(currentEntry.getKey());
}
MemoryConsumer c = cList.get(cList.size() - 1);
try {
long released = c.spill(required - got, consumer);
if (released > 0) {
Expand All @@ -185,6 +182,11 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
if (got >= required) {
break;
}
} else {
cList.remove(cList.size() - 1);
if (cList.isEmpty()) {
sortedConsumers.remove(currentEntry.getKey());
}
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public abstract class RecordComparator {
public abstract int compare(
Object leftBaseObject,
long leftBaseOffset,
int leftBaseLength,
Object rightBaseObject,
long rightBaseOffset);
long rightBaseOffset,
int rightBaseLength);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {
int uaoSize = UnsafeAlignedOffset.getUaoSize();
if (prefixComparisonResult == 0) {
final Object baseObject1 = memoryManager.getPage(r1.recordPointer);
// skip length
final long baseOffset1 = memoryManager.getOffsetInPage(r1.recordPointer) + uaoSize;
final int baseLength1 = UnsafeAlignedOffset.getSize(baseObject1, baseOffset1 - uaoSize);
final Object baseObject2 = memoryManager.getPage(r2.recordPointer);
// skip length
final long baseOffset2 = memoryManager.getOffsetInPage(r2.recordPointer) + uaoSize;
return recordComparator.compare(baseObject1, baseOffset1, baseObject2, baseOffset2);
final int baseLength2 = UnsafeAlignedOffset.getSize(baseObject2, baseOffset2 - uaoSize);
return recordComparator.compare(baseObject1, baseOffset1, baseLength1, baseObject2,
baseOffset2, baseLength2);
} else {
return prefixComparisonResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ final class UnsafeSorterSpillMerger {
prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix());
if (prefixComparisonResult == 0) {
return recordComparator.compare(
left.getBaseObject(), left.getBaseOffset(),
right.getBaseObject(), right.getBaseOffset());
left.getBaseObject(), left.getBaseOffset(), left.getRecordLength(),
right.getBaseObject(), right.getBaseOffset(), right.getRecordLength());
} else {
return prefixComparisonResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ public UnsafeSorterSpillReader(
SparkEnv.get() == null ? 0.5 :
SparkEnv.get().conf().getDouble("spark.unsafe.sorter.spill.read.ahead.fraction", 0.5);

// SPARK-23310: Disable read-ahead input stream, because it is causing lock contention and perf
// regression for TPC-DS queries.
final boolean readAheadEnabled = SparkEnv.get() != null &&
SparkEnv.get().conf().getBoolean("spark.unsafe.sorter.spill.read.ahead.enabled", true);
SparkEnv.get().conf().getBoolean("spark.unsafe.sorter.spill.read.ahead.enabled", false);

final InputStream bs =
new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ $(document).ready(function() {
requestedIncomplete = getParameterByName("showIncomplete", searchString);
requestedIncomplete = (requestedIncomplete == "true" ? true : false);

$.getJSON("api/v1/applications?limit=" + appLimit, function(response,status,jqXHR) {
appParams = {
limit: appLimit,
status: (requestedIncomplete ? "running" : "completed")
};

$.getJSON("api/v1/applications", appParams, function(response,status,jqXHR) {
var array = [];
var hasMultipleAttempts = false;
for (i in response) {
Expand Down
51 changes: 27 additions & 24 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ object Partitioner {
/**
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
* If any of the RDDs already has a partitioner, and the number of partitions of the
* partitioner is either greater than or is less than and within a single order of
* magnitude of the max number of upstream partitions, choose that one.
* If spark.default.parallelism is set, we'll use the value of SparkContext defaultParallelism
* as the default partitions number, otherwise we'll use the max number of upstream partitions.
*
* Otherwise, we use a default HashPartitioner. For the number of partitions, if
* spark.default.parallelism is set, then we'll use the value from SparkContext
* defaultParallelism, otherwise we'll use the max number of upstream partitions.
* When available, we choose the partitioner from rdds with maximum number of partitions. If this
* partitioner is eligible (number of partitions within an order of maximum number of partitions
* in rdds), or has partition number higher than default partitions number - we use this
* partitioner.
*
* Unless spark.default.parallelism is set, the number of partitions will be the
* same as the number of partitions in the largest upstream RDD, as this should
* be least likely to cause out-of-memory errors.
* Otherwise, we'll use a new HashPartitioner with the default partitions number.
*
* Unless spark.default.parallelism is set, the number of partitions will be the same as the
* number of partitions in the largest upstream RDD, as this should be least likely to cause
* out-of-memory errors.
*
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
Expand All @@ -67,31 +69,32 @@ object Partitioner {
None
}

if (isEligiblePartitioner(hasMaxPartitioner, rdds)) {
val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
rdd.context.defaultParallelism
} else {
rdds.map(_.partitions.length).max
}

// If the existing max partitioner is an eligible one, or its partitions number is larger
// than the default number of partitions, use the existing partitioner.
if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
hasMaxPartitioner.get.partitioner.get
} else {
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(rdds.map(_.partitions.length).max)
}
new HashPartitioner(defaultNumPartitions)
}
}

/**
* Returns true if the number of partitions of the RDD is either greater
* than or is less than and within a single order of magnitude of the
* max number of upstream partitions;
* otherwise, returns false
* Returns true if the number of partitions of the RDD is either greater than or is less than and
* within a single order of magnitude of the max number of upstream partitions, otherwise returns
* false.
*/
private def isEligiblePartitioner(
hasMaxPartitioner: Option[RDD[_]],
hasMaxPartitioner: RDD[_],
rdds: Seq[RDD[_]]): Boolean = {
if (hasMaxPartitioner.isEmpty) {
return false
}
val maxPartitions = rdds.map(_.partitions.length).max
log10(maxPartitions) - log10(hasMaxPartitioner.get.getNumPartitions) < 1
log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2276,7 +2276,7 @@ class SparkContext(config: SparkConf) extends Logging {
}

/**
* Clean a closure to make it ready to be serialized and send to tasks
* Clean a closure to make it ready to be serialized and sent to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
* If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
* check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
Expand Down
Loading

0 comments on commit bd8b645

Please sign in to comment.