Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache-github/master' into SPARK-15214
Browse files Browse the repository at this point in the history
  • Loading branch information
hvanhovell committed Jun 3, 2016
2 parents b3531cb + 28ad0f7 commit 5cfba19
Show file tree
Hide file tree
Showing 241 changed files with 3,713 additions and 2,007 deletions.
6 changes: 5 additions & 1 deletion build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,13 @@ cd "${_CALLING_DIR}"
# Now that zinc is ensured to be installed, check its status and, if its
# not running or just installed, start it
if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then
ZINC_JAVA_HOME=
if [ -n "$JAVA_7_HOME" ]; then
ZINC_JAVA_HOME="env JAVA_HOME=$JAVA_7_HOME"
fi
export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
"${ZINC_BIN}" -shutdown -port ${ZINC_PORT}
"${ZINC_BIN}" -start -port ${ZINC_PORT} \
$ZINC_JAVA_HOME "${ZINC_BIN}" -start -port ${ZINC_PORT} \
-scala-compiler "${SCALA_COMPILER}" \
-scala-library "${SCALA_LIBRARY}" &>/dev/null
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* Base interface for a function used in Dataset's filter function.
*
* If the function returns true, the element is discarded in the returned Dataset.
* If the function returns true, the element is included in the returned Dataset.
*/
public interface FilterFunction<T> extends Serializable {
boolean call(T value) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ package org.apache.spark.api.java
* these interfaces to pass functions to various Java API methods for Spark. Please visit Spark's
* Java programming guide for more details.
*/
package object function
package object function
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.spark.unsafe.memory.MemoryBlock;

/**
* An memory consumer of TaskMemoryManager, which support spilling.
* A memory consumer of {@link TaskMemoryManager} that supports spilling.
*
* Note: this only supports allocation / spilling of Tungsten memory.
*/
Expand All @@ -45,7 +45,7 @@ protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
}

/**
* Returns the memory mode, ON_HEAP or OFF_HEAP.
* Returns the memory mode, {@link MemoryMode#ON_HEAP} or {@link MemoryMode#OFF_HEAP}.
*/
public MemoryMode getMode() {
return mode;
Expand Down
16 changes: 9 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark
import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Properties, ServiceLoader, UUID}
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}

Expand Down Expand Up @@ -356,12 +356,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
*/
def setLogLevel(logLevel: String) {
val validLevels = Seq("ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN")
if (!validLevels.contains(logLevel)) {
throw new IllegalArgumentException(
s"Supplied level $logLevel did not match one of: ${validLevels.mkString(",")}")
}
Utils.setLogLevel(org.apache.log4j.Level.toLevel(logLevel))
// let's allow lowcase or mixed case too
val upperCased = logLevel.toUpperCase(Locale.ENGLISH)
require(SparkContext.VALID_LOG_LEVELS.contains(upperCased),
s"Supplied level $logLevel did not match one of:" +
s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}")
Utils.setLogLevel(org.apache.log4j.Level.toLevel(upperCased))
}

try {
Expand Down Expand Up @@ -2179,6 +2179,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* various Spark features.
*/
object SparkContext extends Logging {
private val VALID_LOG_LEVELS =
Set("ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN")

/**
* Lock that guards access to global variables that track SparkContext construction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ class JavaSparkContext(val sc: SparkContext)
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `add` method. Only the master can access the accumulator's `value`.
*/
@deprecated("use AccumulatorV2", "2.0.0")
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
sc.accumulator(initialValue)(accumulatorParam)

Expand All @@ -605,6 +606,7 @@ class JavaSparkContext(val sc: SparkContext)
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
@deprecated("use AccumulatorV2", "2.0.0")
def accumulator[T](initialValue: T, name: String, accumulatorParam: AccumulatorParam[T])
: Accumulator[T] =
sc.accumulator(initialValue, name)(accumulatorParam)
Expand All @@ -613,6 +615,7 @@ class JavaSparkContext(val sc: SparkContext)
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
* can "add" values with `add`. Only the master can access the accumuable's `value`.
*/
@deprecated("use AccumulatorV2", "2.0.0")
def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
sc.accumulable(initialValue)(param)

Expand All @@ -622,6 +625,7 @@ class JavaSparkContext(val sc: SparkContext)
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
@deprecated("use AccumulatorV2", "2.0.0")
def accumulable[T, R](initialValue: T, name: String, param: AccumulableParam[T, R])
: Accumulable[T, R] =
sc.accumulable(initialValue, name)(param)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ private[spark] abstract class RestSubmissionServer(
server.addConnector(connector)

val mainHandler = new ServletContextHandler
mainHandler.setServer(server)
mainHandler.setContextPath("/")
contextToServlet.foreach { case (prefix, servlet) =>
mainHandler.addServlet(new ServletHolder(servlet), prefix)
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)

var noLocality = true // if true if no preferredLocations exists for parent RDD

// gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = {
prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
}

class PartitionLocations(prev: RDD[_]) {

// contains all the partitions from the previous RDD that don't have preferred locations
Expand All @@ -184,7 +189,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
val tmpPartsWithLocs = mutable.LinkedHashMap[Partition, Seq[String]]()
// first get the locations for each partition, only do this once since it can be expensive
prev.partitions.foreach(p => {
val locs = prev.context.getPreferredLocs(prev, p.index).map(tl => tl.host)
val locs = currPrefLocs(p, prev)
if (locs.size > 0) {
tmpPartsWithLocs.put(p, locs)
} else {
Expand Down Expand Up @@ -287,9 +292,8 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
balanceSlack: Double,
partitionLocs: PartitionLocations): PartitionGroup = {
val slack = (balanceSlack * prev.partitions.length).toInt
val preflocs = partitionLocs.partsWithLocs.filter(_._2 == p).map(_._1).toSeq
// least loaded pref locs
val pref = preflocs.map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
val pref = currPrefLocs(p, prev).map(getLeastGroupHash(_)).sortWith(compare)
val prefPart = if (pref == Nil) None else pref.head

val r1 = rnd.nextInt(groupArr.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ private[spark] class TaskSchedulerImpl(
// on this class.
private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]

// Protected by `this`
private[scheduler] val taskIdToTaskSetManager = new HashMap[Long, TaskSetManager]
val taskIdToExecutorId = new HashMap[Long, String]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// manager to reregister itself. If that happens, the block manager master will know
// about the executor, but the scheduler will not. Therefore, we should remove the
// executor from the block manager when we hit this case.
scheduler.sc.env.blockManager.master.removeExecutor(executorId)
scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId)
logInfo(s"Asked to remove non-existent executor $executorId")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,6 @@ private[storage] class BlockInfoManager extends Logging {
* If another task has already locked this block for either reading or writing, then this call
* will block until the other locks are released or will return immediately if `blocking = false`.
*
* If this is called by a task which already holds the block's exclusive write lock, then this
* method will throw an exception.
*
* @param blockId the block to lock.
* @param blocking if true (default), this call will block until the lock is acquired. If false,
* this call will return immediately if the lock acquisition fails.
Expand Down
20 changes: 14 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,17 @@ private[spark] class BlockManager(
locations
}

/**
* Cleanup code run in response to a failed local read.
* Must be called while holding a read lock on the block.
*/
private def handleLocalReadFailure(blockId: BlockId): Nothing = {
releaseLock(blockId)
// Remove the missing block so that its unavailability is reported to the driver
removeBlock(blockId)
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
}

/**
* Get block from local block manager as an iterator of Java objects.
*/
Expand Down Expand Up @@ -441,8 +452,7 @@ private[spark] class BlockManager(
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
} else {
releaseLock(blockId)
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
handleLocalReadFailure(blockId)
}
}
}
Expand Down Expand Up @@ -489,8 +499,7 @@ private[spark] class BlockManager(
// The block was not found on disk, so serialize an in-memory copy:
serializerManager.dataSerialize(blockId, memoryStore.getValues(blockId).get)
} else {
releaseLock(blockId)
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
handleLocalReadFailure(blockId)
}
} else { // storage level is serialized
if (level.useMemory && memoryStore.contains(blockId)) {
Expand All @@ -499,8 +508,7 @@ private[spark] class BlockManager(
val diskBytes = diskStore.getBytes(blockId)
maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes)
} else {
releaseLock(blockId)
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
handleLocalReadFailure(blockId)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ class BlockManagerMaster(
logInfo("Removed " + execId + " successfully in removeExecutor")
}

/** Request removal of a dead executor from the driver endpoint.
* This is only called on the driver side. Non-blocking
*/
def removeExecutorAsync(execId: String) {
driverEndpoint.ask[Boolean](RemoveExecutor(execId))
logInfo("Removal of executor " + execId + " requested")
}

/** Register the BlockManager's id with the driver. */
def registerBlockManager(
blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = {
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,14 @@ class StorageLevel private(
private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)

override def toString: String = {
s"StorageLevel(disk=$useDisk, memory=$useMemory, offheap=$useOffHeap, " +
s"deserialized=$deserialized, replication=$replication)"
val disk = if (useDisk) "disk" else ""
val memory = if (useMemory) "memory" else ""
val heap = if (useOffHeap) "offheap" else ""
val deserialize = if (deserialized) "deserialized" else ""

val output =
Seq(disk, memory, heap, deserialize, s"$replication replicas").filter(_.nonEmpty)
s"StorageLevel(${output.mkString(", ")})"
}

override def hashCode(): Int = toInt * 41 + replication
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ private[spark] object JettyUtils extends Logging {

val errorHandler = new ErrorHandler()
errorHandler.setShowStacks(true)
errorHandler.setServer(server)
server.addBean(errorHandler)
server.setHandler(collection)
try {
Expand Down
31 changes: 13 additions & 18 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2344,29 +2344,24 @@ private[spark] class RedirectThread(
* the toString method.
*/
private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream {
var pos: Int = 0
var buffer = new Array[Int](sizeInBytes)
private var pos: Int = 0
private var isBufferFull = false
private val buffer = new Array[Byte](sizeInBytes)

def write(i: Int): Unit = {
buffer(pos) = i
def write(input: Int): Unit = {
buffer(pos) = input.toByte
pos = (pos + 1) % buffer.length
isBufferFull = isBufferFull || (pos == 0)
}

override def toString: String = {
val (end, start) = buffer.splitAt(pos)
val input = new java.io.InputStream {
val iterator = (start ++ end).iterator

def read(): Int = if (iterator.hasNext) iterator.next() else -1
}
val reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))
val stringBuilder = new StringBuilder
var line = reader.readLine()
while (line != null) {
stringBuilder.append(line)
stringBuilder.append("\n")
line = reader.readLine()
if (!isBufferFull) {
return new String(buffer, 0, pos, StandardCharsets.UTF_8)
}
stringBuilder.toString()

val nonCircularBuffer = new Array[Byte](sizeInBytes)
System.arraycopy(buffer, pos, nonCircularBuffer, 0, buffer.length - pos)
System.arraycopy(buffer, 0, nonCircularBuffer, buffer.length - pos, pos)
new String(nonCircularBuffer, StandardCharsets.UTF_8)
}
}
5 changes: 2 additions & 3 deletions core/src/test/scala/org/apache/spark/DistributedSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex

test("accumulators") {
sc = new SparkContext(clusterUrl, "test")
val accum = sc.accumulator(0)
sc.parallelize(1 to 10, 10).foreach(x => accum += x)
val accum = sc.longAccumulator
sc.parallelize(1 to 10, 10).foreach(x => accum.add(x))
assert(accum.value === 55)
}

Expand All @@ -109,7 +109,6 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex

test("repeatedly failing task") {
sc = new SparkContext(clusterUrl, "test")
val accum = sc.accumulator(0)
val thrown = intercept[SparkException] {
// scalastyle:off println
sc.parallelize(1 to 10, 10).foreach(x => println(x / 0))
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/scala/org/apache/spark/FailureSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import java.io.{IOException, NotSerializableException, ObjectInputStream}

import org.apache.spark.memory.TestMemoryConsumer
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.NonSerializable

// Common state shared by FailureSuite-launched tasks. We use a global object
Expand Down Expand Up @@ -241,6 +242,17 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext {
FailureSuiteState.clear()
}

test("failure because cached RDD partitions are missing from DiskStore (SPARK-15736)") {
sc = new SparkContext("local[1,2]", "test")
val rdd = sc.parallelize(1 to 2, 2).persist(StorageLevel.DISK_ONLY)
rdd.count()
// Directly delete all files from the disk store, triggering failures when reading cached data:
SparkEnv.get.blockManager.diskBlockManager.getAllFiles().foreach(_.delete())
// Each task should fail once due to missing cached data, but then should succeed on its second
// attempt because the missing cache locations will be purged and the blocks will be recomputed.
rdd.count()
}

// TODO: Need to add tests with shuffle fetch failures.
}

Expand Down
15 changes: 15 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -363,4 +363,19 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext {
sc.stop()
assert(result == null)
}

test("log level case-insensitive and reset log level") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
val originalLevel = org.apache.log4j.Logger.getRootLogger().getLevel
try {
sc.setLogLevel("debug")
assert(org.apache.log4j.Logger.getRootLogger().getLevel === org.apache.log4j.Level.DEBUG)
sc.setLogLevel("INfo")
assert(org.apache.log4j.Logger.getRootLogger().getLevel === org.apache.log4j.Level.INFO)
} finally {
sc.setLogLevel(originalLevel.toString)
assert(org.apache.log4j.Logger.getRootLogger().getLevel === originalLevel)
sc.stop()
}
}
}
Loading

0 comments on commit 5cfba19

Please sign in to comment.