Skip to content

Commit

Permalink
Merge pull request apache#263 from palantir/rk/resync
Browse files Browse the repository at this point in the history
Merge upstream apache
  • Loading branch information
robert3005 authored Sep 23, 2017
2 parents 2817552 + 29c4250 commit 8d0d856
Show file tree
Hide file tree
Showing 171 changed files with 8,278 additions and 1,291 deletions.
40 changes: 27 additions & 13 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -986,10 +986,10 @@ setMethod("unique",
#' @param x A SparkDataFrame
#' @param withReplacement Sampling with replacement or not
#' @param fraction The (rough) sample target fraction
#' @param seed Randomness seed value
#' @param seed Randomness seed value. Default is a random seed.
#'
#' @family SparkDataFrame functions
#' @aliases sample,SparkDataFrame,logical,numeric-method
#' @aliases sample,SparkDataFrame-method
#' @rdname sample
#' @name sample
#' @export
Expand All @@ -998,33 +998,47 @@ setMethod("unique",
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' collect(sample(df, fraction = 0.5))
#' collect(sample(df, FALSE, 0.5))
#' collect(sample(df, TRUE, 0.5))
#' collect(sample(df, TRUE, 0.5, seed = 3))
#'}
#' @note sample since 1.4.0
setMethod("sample",
signature(x = "SparkDataFrame", withReplacement = "logical",
fraction = "numeric"),
function(x, withReplacement, fraction, seed) {
if (fraction < 0.0) stop(cat("Negative fraction value:", fraction))
signature(x = "SparkDataFrame"),
function(x, withReplacement = FALSE, fraction, seed) {
if (!is.numeric(fraction)) {
stop(paste("fraction must be numeric; however, got", class(fraction)))
}
if (!is.logical(withReplacement)) {
stop(paste("withReplacement must be logical; however, got", class(withReplacement)))
}

if (!missing(seed)) {
if (is.null(seed)) {
stop("seed must not be NULL or NA; however, got NULL")
}
if (is.na(seed)) {
stop("seed must not be NULL or NA; however, got NA")
}

# TODO : Figure out how to send integer as java.lang.Long to JVM so
# we can send seed as an argument through callJMethod
sdf <- callJMethod(x@sdf, "sample", withReplacement, fraction, as.integer(seed))
sdf <- handledCallJMethod(x@sdf, "sample", as.logical(withReplacement),
as.numeric(fraction), as.integer(seed))
} else {
sdf <- callJMethod(x@sdf, "sample", withReplacement, fraction)
sdf <- handledCallJMethod(x@sdf, "sample",
as.logical(withReplacement), as.numeric(fraction))
}
dataFrame(sdf)
})

#' @rdname sample
#' @aliases sample_frac,SparkDataFrame,logical,numeric-method
#' @aliases sample_frac,SparkDataFrame-method
#' @name sample_frac
#' @note sample_frac since 1.4.0
setMethod("sample_frac",
signature(x = "SparkDataFrame", withReplacement = "logical",
fraction = "numeric"),
function(x, withReplacement, fraction, seed) {
signature(x = "SparkDataFrame"),
function(x, withReplacement = FALSE, fraction, seed) {
sample(x, withReplacement, fraction, seed)
})

Expand Down
10 changes: 6 additions & 4 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -2226,8 +2226,9 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType")
})

#' @details
#' \code{from_utc_timestamp}: Given a timestamp, which corresponds to a certain time of day in UTC,
#' returns another timestamp that corresponds to the same time of day in the given timezone.
#' \code{from_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a
#' time in UTC, and renders that time as a timestamp in the given time zone. For example, 'GMT+1'
#' would yield '2017-07-14 03:40:00.0'.
#'
#' @rdname column_datetime_diff_functions
#'
Expand Down Expand Up @@ -2286,8 +2287,9 @@ setMethod("next_day", signature(y = "Column", x = "character"),
})

#' @details
#' \code{to_utc_timestamp}: Given a timestamp, which corresponds to a certain time of day
#' in the given timezone, returns another timestamp that corresponds to the same time of day in UTC.
#' \code{to_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a
#' time in the given time zone, and renders that time as a timestamp in UTC. For example, 'GMT+1'
#' would yield '2017-07-14 01:40:00.0'.
#'
#' @rdname column_datetime_diff_functions
#' @aliases to_utc_timestamp to_utc_timestamp,Column,character-method
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ setGeneric("repartition", function(x, ...) { standardGeneric("repartition") })
#' @rdname sample
#' @export
setGeneric("sample",
function(x, withReplacement, fraction, seed) {
function(x, withReplacement = FALSE, fraction, seed) {
standardGeneric("sample")
})

Expand All @@ -656,7 +656,7 @@ setGeneric("rollup", function(x, ...) { standardGeneric("rollup") })
#' @rdname sample
#' @export
setGeneric("sample_frac",
function(x, withReplacement, fraction, seed) { standardGeneric("sample_frac") })
function(x, withReplacement = FALSE, fraction, seed) { standardGeneric("sample_frac") })

#' @rdname sampleBy
#' @export
Expand Down
14 changes: 14 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,20 @@ test_that("sample on a DataFrame", {
sampled3 <- sample_frac(df, FALSE, 0.1, 0) # set seed for predictable result
expect_true(count(sampled3) < 3)

# Different arguments
df <- createDataFrame(as.list(seq(10)))
expect_equal(count(sample(df, fraction = 0.5, seed = 3)), 4)
expect_equal(count(sample(df, withReplacement = TRUE, fraction = 0.5, seed = 3)), 2)
expect_equal(count(sample(df, fraction = 1.0)), 10)
expect_equal(count(sample(df, fraction = 1L)), 10)
expect_equal(count(sample(df, FALSE, fraction = 1.0)), 10)

expect_error(sample(df, fraction = "a"), "fraction must be numeric")
expect_error(sample(df, "a", fraction = 0.1), "however, got character")
expect_error(sample(df, fraction = 1, seed = NA), "seed must not be NULL or NA; however, got NA")
expect_error(sample(df, fraction = -1.0),
"illegal argument - requirement failed: Sampling fraction \\(-1.0\\)")

# nolint start
# Test base::sample is working
#expect_equal(length(sample(1:12)), 12)
Expand Down
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<version>3.1.0</version>
<executions>
<execution>
<id>dist</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.spark.network.util;

public enum ByteUnit {
BYTE (1),
KiB (1024L),
MiB ((long) Math.pow(1024L, 2L)),
GiB ((long) Math.pow(1024L, 3L)),
TiB ((long) Math.pow(1024L, 4L)),
PiB ((long) Math.pow(1024L, 5L));
BYTE(1),
KiB(1024L),
MiB((long) Math.pow(1024L, 2L)),
GiB((long) Math.pow(1024L, 3L)),
TiB((long) Math.pow(1024L, 4L)),
PiB((long) Math.pow(1024L, 5L));

ByteUnit(long multiplier) {
this.multiplier = multiplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import java.util.List;

import com.codahale.metrics.MetricSet;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -117,6 +118,12 @@ public void fetchBlocks(
}
}

@Override
public MetricSet shuffleMetrics() {
checkInit();
return clientFactory.getAllMetrics();
}

/**
* Registers this executor with an external shuffle server. This registration is required to
* inform the shuffle server about where and how we store our shuffle files.
Expand All @@ -140,6 +147,7 @@ public void registerWithShuffleServer(

@Override
public void close() {
checkInit();
clientFactory.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.spark.network.shuffle;

import java.io.Closeable;
import java.util.Collections;

import com.codahale.metrics.MetricSet;

/** Provides an interface for reading shuffle files, either from an Executor or external service. */
public abstract class ShuffleClient implements Closeable {
Expand Down Expand Up @@ -52,4 +55,13 @@ public abstract void fetchBlocks(
String[] blockIds,
BlockFetchingListener listener,
TempShuffleFileManager tempShuffleFileManager);

/**
* Get the shuffle MetricsSet from ShuffleClient, this will be used in MetricsSystem to
* get the Shuffle related metrics.
*/
public MetricSet shuffleMetrics() {
// Return an empty MetricSet by default.
return () -> Collections.emptyMap();
}
}
27 changes: 12 additions & 15 deletions common/sketch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,17 @@
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<javacArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
<javacArg>-XDignore.symbol.file</javacArg>
</javacArgs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<configuration>
<javacArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
<javacArg>-XDignore.symbol.file</javacArg>
</javacArgs>
</configuration>
</plugin>
</plugins>
</build>
</project>
27 changes: 12 additions & 15 deletions common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,17 @@
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<javacArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
<javacArg>-XDignore.symbol.file</javacArg>
</javacArgs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<configuration>
<javacArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
<javacArg>-XDignore.symbol.file</javacArg>
</javacArgs>
</configuration>
</plugin>
</plugins>
</build>
</project>
Loading

0 comments on commit 8d0d856

Please sign in to comment.