Skip to content

Commit

Permalink
Modified confliction of MimExcludes
Browse files Browse the repository at this point in the history
  • Loading branch information
sarutak committed Sep 29, 2014
2 parents 2cdd009 + 0bbe7fa commit 97cb85c
Show file tree
Hide file tree
Showing 39 changed files with 2,287 additions and 884 deletions.
43 changes: 37 additions & 6 deletions core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

package org.apache.spark.network

import java.io.{FileInputStream, RandomAccessFile, File, InputStream}
import java.io._
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.channels.FileChannel.MapMode

import scala.util.Try

import com.google.common.io.ByteStreams
import io.netty.buffer.{ByteBufInputStream, ByteBuf}

import org.apache.spark.util.ByteBufferInputStream
import org.apache.spark.util.{ByteBufferInputStream, Utils}


/**
Expand Down Expand Up @@ -71,18 +73,47 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
try {
channel = new RandomAccessFile(file, "r").getChannel
channel.map(MapMode.READ_ONLY, offset, length)
} catch {
case e: IOException =>
Try(channel.size).toOption match {
case Some(fileLen) =>
throw new IOException(s"Error in reading $this (actual file length $fileLen)", e)
case None =>
throw new IOException(s"Error in opening $this", e)
}
} finally {
if (channel != null) {
channel.close()
Utils.tryLog(channel.close())
}
}
}

override def inputStream(): InputStream = {
val is = new FileInputStream(file)
is.skip(offset)
ByteStreams.limit(is, length)
var is: FileInputStream = null
try {
is = new FileInputStream(file)
is.skip(offset)
ByteStreams.limit(is, length)
} catch {
case e: IOException =>
if (is != null) {
Utils.tryLog(is.close())
}
Try(file.length).toOption match {
case Some(fileLen) =>
throw new IOException(s"Error in reading $this (actual file length $fileLen)", e)
case None =>
throw new IOException(s"Error in opening $this", e)
}
case e: Throwable =>
if (is != null) {
Utils.tryLog(is.close())
}
throw e
}
}

override def toString: String = s"${getClass.getName}($file, $offset, $length)"
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,7 @@ class DAGScheduler(
.format(job.jobId, stageId))
} else if (jobsForStage.get.size == 1) {
if (!stageIdToStage.contains(stageId)) {
logError("Missing Stage for stage with id $stageId")
logError(s"Missing Stage for stage with id $stageId")
} else {
// This is the only job that uses this stage, so fail the stage if it is running.
val stage = stageIdToStage(stageId)
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,20 @@ private[spark] object Utils extends Logging {
}
}

/** Executes the given block in a Try, logging any uncaught exceptions. */
def tryLog[T](f: => T): Try[T] = {
try {
val res = f
scala.util.Success(res)
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
scala.util.Failure(t)
}
}

/** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */
def isFatalError(e: Throwable): Boolean = {
e match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private[spark] class ExternalSorter[K, V, C](
override def compare(a: K, b: K): Int = {
val h1 = if (a == null) 0 else a.hashCode()
val h2 = if (b == null) 0 else b.hashCode()
h1 - h2
if (h1 < h2) -1 else if (h1 == h2) 0 else 1
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.scalatest.{PrivateMethodTester, FunSuite}
import org.apache.spark._
import org.apache.spark.SparkContext._

import scala.util.Random

class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMethodTester {
private def createSparkConf(loadDefaults: Boolean): SparkConf = {
val conf = new SparkConf(loadDefaults)
Expand Down Expand Up @@ -707,4 +709,57 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
Some(agg), Some(new HashPartitioner(FEW_PARTITIONS)), None, None)
assertDidNotBypassMergeSort(sorter4)
}

test("sort without breaking sorting contracts") {
val conf = createSparkConf(true)
conf.set("spark.shuffle.memoryFraction", "0.01")
conf.set("spark.shuffle.manager", "sort")
sc = new SparkContext("local-cluster[1,1,512]", "test", conf)

// Using wrongOrdering to show integer overflow introduced exception.
val rand = new Random(100L)
val wrongOrdering = new Ordering[String] {
override def compare(a: String, b: String) = {
val h1 = if (a == null) 0 else a.hashCode()
val h2 = if (b == null) 0 else b.hashCode()
h1 - h2
}
}

val testData = Array.tabulate(100000) { _ => rand.nextInt().toString }

val sorter1 = new ExternalSorter[String, String, String](
None, None, Some(wrongOrdering), None)
val thrown = intercept[IllegalArgumentException] {
sorter1.insertAll(testData.iterator.map(i => (i, i)))
sorter1.iterator
}

assert(thrown.getClass() === classOf[IllegalArgumentException])
assert(thrown.getMessage().contains("Comparison method violates its general contract"))
sorter1.stop()

// Using aggregation and external spill to make sure ExternalSorter using
// partitionKeyComparator.
def createCombiner(i: String) = ArrayBuffer(i)
def mergeValue(c: ArrayBuffer[String], i: String) = c += i
def mergeCombiners(c1: ArrayBuffer[String], c2: ArrayBuffer[String]) = c1 ++= c2

val agg = new Aggregator[String, String, ArrayBuffer[String]](
createCombiner, mergeValue, mergeCombiners)

val sorter2 = new ExternalSorter[String, String, ArrayBuffer[String]](
Some(agg), None, None, None)
sorter2.insertAll(testData.iterator.map(i => (i, i)))

// To validate the hash ordering of key
var minKey = Int.MinValue
sorter2.iterator.foreach { case (k, v) =>
val h = k.hashCode()
assert(h >= minKey)
minKey = h
}

sorter2.stop()
}
}
Loading

0 comments on commit 97cb85c

Please sign in to comment.