Skip to content

Commit

Permalink
[SPARK-3280] Made sort-based shuffle the default implementation
Browse files Browse the repository at this point in the history
Sort-based shuffle has lower memory usage and seems to outperform hash-based in almost all of our testing.

Author: Reynold Xin <[email protected]>

Closes apache#2178 from rxin/sort-shuffle and squashes the following commits:

713d341 [Reynold Xin] Fixed test failures by setting spark.shuffle.compress to the same value as spark.shuffle.spill.compress.
85165e6 [Reynold Xin] Fixed a comment typo.
aa0d372 [Reynold Xin] [SPARK-3280] Made sort-based shuffle the default implementation
  • Loading branch information
rxin committed Sep 8, 2014
1 parent 4ba2673 commit f25bbbd
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 9 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ object SparkEnv extends Logging {
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

Expand Down
33 changes: 33 additions & 0 deletions core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.scalatest.BeforeAndAfterAll

class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {

// This test suite should run all tests in ShuffleSuite with hash-based shuffle.

override def beforeAll() {
System.setProperty("spark.shuffle.manager", "hash")
}

override def afterAll() {
System.clearProperty("spark.shuffle.manager")
}
}
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.MutablePair

class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {

val conf = new SparkConf(loadDefaults = false)

Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/org/apache/spark/SortShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
// This test suite should run all tests in ShuffleSuite with sort-based shuffle.

override def beforeAll() {
System.setProperty("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.SortShuffleManager")
System.setProperty("spark.shuffle.manager", "sort")
}

override def afterAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
conf.set("spark.serializer.objectStreamReset", "1")
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
conf.set("spark.shuffle.spill.compress", codec.isDefined.toString)
conf.set("spark.shuffle.compress", codec.isDefined.toString)
codec.foreach { c => conf.set("spark.io.compression.codec", c) }
// Ensure that we actually have multiple batches per spill file
conf.set("spark.shuffle.spill.batchSize", "10")
Expand Down
9 changes: 4 additions & 5 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,11 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.shuffle.manager</code></td>
<td>HASH</td>
<td>sort</td>
<td>
Implementation to use for shuffling data. A hash-based shuffle manager is the default, but
starting in Spark 1.1 there is an experimental sort-based shuffle manager that is more
memory-efficient in environments with small executors, such as YARN. To use that, change
this value to <code>SORT</code>.
Implementation to use for shuffling data. There are two implementations available:
<code>sort</code> and <code>hash</code>. Sort-based shuffle is more memory-efficient and is
the default option starting in 1.2.
</td>
</tr>
<tr>
Expand Down

0 comments on commit f25bbbd

Please sign in to comment.