Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into constraint-cast
Browse files Browse the repository at this point in the history
Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
	sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
  • Loading branch information
viirya committed Mar 31, 2016
2 parents 8e8dd72 + 26445c2 commit ec84c5f
Show file tree
Hide file tree
Showing 398 changed files with 18,274 additions and 5,939 deletions.
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(BSD 3 Clause) netlib core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
(BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.2.7 - https://github.com/jpmml/jpmml-model)
(BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/)
(BSD License) ANTLR 4.5.2-1 (org.antlr:antlr4:4.5.2-1 - http://wwww.antlr.org/)
(BSD licence) ANTLR ST4 4.0.4 (org.antlr:ST4:4.0.4 - http://www.stringtemplate.org)
(BSD licence) ANTLR StringTemplate (org.antlr:stringtemplate:3.2.1 - http://www.stringtemplate.org)
(BSD License) Javolution (javolution:javolution:5.5.1 - http://javolution.org)
Expand Down
1 change: 0 additions & 1 deletion NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ Eclipse Public License 1.0

The following components are provided under the Eclipse Public License 1.0. See project link for details.

(Eclipse Public License - Version 1.0) mqtt-client (org.eclipse.paho:mqtt-client:0.4.0 - http://www.eclipse.org/paho/mqtt-client)
(Eclipse Public License v1.0) Eclipse JDT Core (org.eclipse.jdt:core:3.1.1 - http://www.eclipse.org/jdt/)

========================================================================
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ Depends:
methods,
Suggests:
testthat,
e1071
e1071,
survival
Description: R frontend for Spark
License: Apache License (== 2.0)
Collate:
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ exportMethods("glm",
"summary",
"kmeans",
"fitted",
"naiveBayes")
"naiveBayes",
"survreg")

# Job group lifecycle management methods
export("setJobGroup",
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -1179,3 +1179,7 @@ setGeneric("fitted")
#' @rdname naiveBayes
#' @export
setGeneric("naiveBayes", function(formula, data, ...) { standardGeneric("naiveBayes") })

#' @rdname survreg
#' @export
setGeneric("survreg", function(formula, data, ...) { standardGeneric("survreg") })
75 changes: 75 additions & 0 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ setClass("PipelineModel", representation(model = "jobj"))
#' @export
setClass("NaiveBayesModel", representation(jobj = "jobj"))

#' @title S4 class that represents a AFTSurvivalRegressionModel
#' @param jobj a Java object reference to the backing Scala AFTSurvivalRegressionWrapper
#' @export
setClass("AFTSurvivalRegressionModel", representation(jobj = "jobj"))

#' Fits a generalized linear model
#'
#' Fits a generalized linear model, similarly to R's glm(). Also see the glmnet package.
Expand Down Expand Up @@ -273,3 +278,73 @@ setMethod("naiveBayes", signature(formula = "formula", data = "DataFrame"),
formula, data@sdf, laplace)
return(new("NaiveBayesModel", jobj = jobj))
})

#' Fit an accelerated failure time (AFT) survival regression model.
#'
#' Fit an accelerated failure time (AFT) survival regression model, similarly to R's survreg().
#'
#' @param formula A symbolic description of the model to be fitted. Currently only a few formula
#' operators are supported, including '~', ':', '+', and '-'.
#' Note that operator '.' is not supported currently.
#' @param data DataFrame for training.
#' @return a fitted AFT survival regression model
#' @rdname survreg
#' @seealso survival: \url{https://cran.r-project.org/web/packages/survival/}
#' @export
#' @examples
#' \dontrun{
#' df <- createDataFrame(sqlContext, ovarian)
#' model <- survreg(Surv(futime, fustat) ~ ecog_ps + rx, df)
#' }
setMethod("survreg", signature(formula = "formula", data = "DataFrame"),
function(formula, data, ...) {
formula <- paste(deparse(formula), collapse = "")
jobj <- callJStatic("org.apache.spark.ml.r.AFTSurvivalRegressionWrapper",
"fit", formula, data@sdf)
return(new("AFTSurvivalRegressionModel", jobj = jobj))
})

#' Get the summary of an AFT survival regression model
#'
#' Returns the summary of an AFT survival regression model produced by survreg(),
#' similarly to R's summary().
#'
#' @param object a fitted AFT survival regression model
#' @return coefficients the model's coefficients, intercept and log(scale).
#' @rdname summary
#' @export
#' @examples
#' \dontrun{
#' model <- survreg(Surv(futime, fustat) ~ ecog_ps + rx, trainingData)
#' summary(model)
#' }
setMethod("summary", signature(object = "AFTSurvivalRegressionModel"),
function(object, ...) {
jobj <- object@jobj
features <- callJMethod(jobj, "rFeatures")
coefficients <- callJMethod(jobj, "rCoefficients")
coefficients <- as.matrix(unlist(coefficients))
colnames(coefficients) <- c("Value")
rownames(coefficients) <- unlist(features)
return(list(coefficients = coefficients))
})

#' Make predictions from an AFT survival regression model
#'
#' Make predictions from a model produced by survreg(), similarly to R package survival's predict.
#'
#' @param object A fitted AFT survival regression model
#' @param newData DataFrame for testing
#' @return DataFrame containing predicted labels in a column named "prediction"
#' @rdname predict
#' @export
#' @examples
#' \dontrun{
#' model <- survreg(Surv(futime, fustat) ~ ecog_ps + rx, trainingData)
#' predicted <- predict(model, testData)
#' showDF(predicted)
#' }
setMethod("predict", signature(object = "AFTSurvivalRegressionModel"),
function(object, newData) {
return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf)))
})
49 changes: 49 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,52 @@ test_that("naiveBayes", {
expect_equal(as.character(predict(m, t1[1, ])), "Yes")
}
})

test_that("survreg", {
# R code to reproduce the result.
#
#' rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0),
#' x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1))
#' library(survival)
#' model <- survreg(Surv(time, status) ~ x + sex, rData)
#' summary(model)
#' predict(model, data)
#
# -- output of 'summary(model)'
#
# Value Std. Error z p
# (Intercept) 1.315 0.270 4.88 1.07e-06
# x -0.190 0.173 -1.10 2.72e-01
# sex -0.253 0.329 -0.77 4.42e-01
# Log(scale) -1.160 0.396 -2.93 3.41e-03
#
# -- output of 'predict(model, data)'
#
# 1 2 3 4 5 6 7
# 3.724591 2.545368 3.079035 3.079035 2.390146 2.891269 2.891269
#
data <- list(list(4, 1, 0, 0), list(3, 1, 2, 0), list(1, 1, 1, 0),
list(1, 0, 1, 0), list(2, 1, 1, 1), list(2, 1, 0, 1), list(3, 0, 0, 1))
df <- createDataFrame(sqlContext, data, c("time", "status", "x", "sex"))
model <- survreg(Surv(time, status) ~ x + sex, df)
stats <- summary(model)
coefs <- as.vector(stats$coefficients[, 1])
rCoefs <- c(1.3149571, -0.1903409, -0.2532618, -1.1599800)
expect_equal(coefs, rCoefs, tolerance = 1e-4)
expect_true(all(
rownames(stats$coefficients) ==
c("(Intercept)", "x", "sex", "Log(scale)")))
p <- collect(select(predict(model, df), "prediction"))
expect_equal(p$prediction, c(3.724591, 2.545368, 3.079035, 3.079035,
2.390146, 2.891269, 2.891269), tolerance = 1e-4)

# Test survival::survreg
if (requireNamespace("survival", quietly = TRUE)) {
rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0),
x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1))
expect_that(
model <- survival::survreg(formula = survival::Surv(time, status) ~ x + sex, data = rData),
not(throws_error()))
expect_equal(predict(model, rData)[[1]], 3.724591, tolerance = 1e-4)
}
})
8 changes: 8 additions & 0 deletions R/pkg/inst/tests/testthat/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -791,3 +791,11 @@ test_that("sampleByKey() on pairwise RDDs", {
expect_equal(lookup(sample, 3)[which.min(lookup(sample, 3))] >= 0, TRUE)
expect_equal(lookup(sample, 3)[which.max(lookup(sample, 3))] <= 2000, TRUE)
})

test_that("Test correct concurrency of RRDD.compute()", {
rdd <- parallelize(sc, 1:1000, 100)
jrdd <- getJRDD(lapply(rdd, function(x) { x }), "row")
zrdd <- callJMethod(jrdd, "zip", jrdd)
count <- callJMethod(zrdd, "count")
expect_equal(count, 1000)
})
6 changes: 5 additions & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ if [[ -n "$IPYTHON_OPTS" || "$IPYTHON" == "1" ]]; then
# If IPython options are specified, assume user wants to run IPython
# (for backwards-compatibility)
PYSPARK_DRIVER_PYTHON_OPTS="$PYSPARK_DRIVER_PYTHON_OPTS $IPYTHON_OPTS"
PYSPARK_DRIVER_PYTHON="ipython"
if [ -x "$(command -v jupyter)" ]; then
PYSPARK_DRIVER_PYTHON="jupyter"
else
PYSPARK_DRIVER_PYTHON="ipython"
fi
elif [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"$DEFAULT_PYTHON"}"
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ public TransportClientFactory(
public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
// Get connection from the connection pool first.
// If it is not found or not active, create a new one.
long preResolveHost = System.nanoTime();
final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
logger.info("Spent {} ms to resolve {}", hostResolveTimeMs, address);

// Create the ClientPool if we don't have it yet.
ClientPool clientPool = connectionPool.get(address);
Expand Down Expand Up @@ -235,7 +238,7 @@ public void initChannel(SocketChannel ch) {
}
long postBootstrap = System.nanoTime();

logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
logger.info("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);

return client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.*;
import org.slf4j.Logger;
Expand Down Expand Up @@ -118,7 +119,7 @@ protected void serviceInit(Configuration conf) {
// an application was stopped while the NM was down, we expect yarn to call stopApplication()
// when it comes back
registeredExecutorFile =
findRegisteredExecutorFile(conf.getStrings("yarn.nodemanager.local-dirs"));
findRegisteredExecutorFile(conf.getTrimmedStrings("yarn.nodemanager.local-dirs"));

TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
// If authentication is enabled, set up the shuffle server to use a
Expand Down Expand Up @@ -191,12 +192,12 @@ public void stopContainer(ContainerTerminationContext context) {

private File findRegisteredExecutorFile(String[] localDirs) {
for (String dir: localDirs) {
File f = new File(dir, "registeredExecutors.ldb");
File f = new File(new Path(dir).toUri().getPath(), "registeredExecutors.ldb");
if (f.exists()) {
return f;
}
}
return new File(localDirs[0], "registeredExecutors.ldb");
return new File(new Path(localDirs[0]).toUri().getPath(), "registeredExecutors.ldb");
}

/**
Expand Down
28 changes: 28 additions & 0 deletions common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.unsafe;

import java.lang.reflect.Field;
import java.lang.reflect.Method;

import sun.misc.Unsafe;

Expand All @@ -37,6 +38,33 @@ public final class Platform {

public static final int DOUBLE_ARRAY_OFFSET;

private static final boolean unaligned;
static {
boolean _unaligned;
// use reflection to access unaligned field
try {
Class<?> bitsClass =
Class.forName("java.nio.Bits", false, ClassLoader.getSystemClassLoader());
Method unalignedMethod = bitsClass.getDeclaredMethod("unaligned");
unalignedMethod.setAccessible(true);
_unaligned = Boolean.TRUE.equals(unalignedMethod.invoke(null));
} catch (Throwable t) {
// We at least know x86 and x64 support unaligned access.
String arch = System.getProperty("os.arch", "");
//noinspection DynamicRegexReplaceableByCompiledPattern
_unaligned = arch.matches("^(i[3-6]86|x86(_64)?|x64|amd64)$");
}
unaligned = _unaligned;
}

/**
* @return true when running JVM is having sun's Unsafe package available in it and underlying
* system having unaligned-access capability.
*/
public static boolean unaligned() {
return unaligned;
}

public static int getInt(Object object, long offset) {
return _UNSAFE.getInt(object, offset);
}
Expand Down
13 changes: 0 additions & 13 deletions core/src/main/java/org/apache/spark/api/java/StorageLevels.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,6 @@ public class StorageLevels {
public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, false, 2);
public static final StorageLevel OFF_HEAP = create(false, false, true, false, 1);

/**
* Create a new StorageLevel object.
* @param useDisk saved to disk, if true
* @param useMemory saved to memory, if true
* @param deserialized saved as deserialized objects, if true
* @param replication replication factor
*/
@Deprecated
public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized,
int replication) {
return StorageLevel.apply(useDisk, useMemory, false, deserialized, replication);
}

/**
* Create a new StorageLevel object.
* @param useDisk saved to disk, if true
Expand Down
Loading

0 comments on commit ec84c5f

Please sign in to comment.