Skip to content

Commit

Permalink
[SPARK-12251] Document and improve off-heap memory configurations
Browse files Browse the repository at this point in the history
This patch adds documentation for Spark configurations that affect off-heap memory and makes some naming and validation improvements for those configs.

- Change `spark.memory.offHeapSize` to `spark.memory.offHeap.size`. This is fine because this configuration has not shipped in any Spark release yet (it's new in Spark 1.6).
- Deprecated `spark.unsafe.offHeap` in favor of a new `spark.memory.offHeap.enabled` configuration. The motivation behind this change is to gather all memory-related configurations under the same prefix.
- Add a check which prevents users from setting `spark.memory.offHeap.enabled=true` when `spark.memory.offHeap.size == 0`. After SPARK-11389 (apache#9344), which was committed in Spark 1.6, Spark enforces a hard limit on the amount of off-heap memory that it will allocate to tasks. As a result, enabling off-heap execution memory without setting `spark.memory.offHeap.size` will lead to immediate OOMs. The new configuration validation makes this scenario easier to diagnose, helping to avoid user confusion.
- Document these configurations on the configuration page.

Author: Josh Rosen <[email protected]>

Closes apache#10237 from JoshRosen/SPARK-12251.
  • Loading branch information
JoshRosen authored and Andrew Or committed Dec 10, 2015
1 parent 6a6c1fc commit 23a9e62
Show file tree
Hide file tree
Showing 15 changed files with 65 additions and 22 deletions.
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,9 @@ private[spark] object SparkConf extends Logging {
"spark.streaming.fileStream.minRememberDuration" -> Seq(
AlternateConfig("spark.streaming.minRememberDuration", "1.5")),
"spark.yarn.max.executor.failures" -> Seq(
AlternateConfig("spark.yarn.max.worker.failures", "1.5"))
AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
"spark.memory.offHeap.enabled" -> Seq(
AlternateConfig("spark.unsafe.offHeap", "1.6"))
)

/**
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[spark] abstract class MemoryManager(

storageMemoryPool.incrementPoolSize(storageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeapSize", 0))
offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeap.size", 0))

/**
* Total available memory for storage, in bytes. This amount can vary over time, depending on
Expand Down Expand Up @@ -182,7 +182,13 @@ private[spark] abstract class MemoryManager(
* sun.misc.Unsafe.
*/
final val tungstenMemoryMode: MemoryMode = {
if (conf.getBoolean("spark.unsafe.offHeap", false)) MemoryMode.OFF_HEAP else MemoryMode.ON_HEAP
if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
"spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
MemoryMode.OFF_HEAP
} else {
MemoryMode.ON_HEAP
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class TaskMemoryManagerSuite {
public void leakedPageMemoryIsDetected() {
final TaskMemoryManager manager = new TaskMemoryManager(
new StaticMemoryManager(
new SparkConf().set("spark.unsafe.offHeap", "false"),
new SparkConf().set("spark.memory.offHeap.enabled", "false"),
Long.MAX_VALUE,
Long.MAX_VALUE,
1),
Expand All @@ -41,8 +41,10 @@ public void leakedPageMemoryIsDetected() {

@Test
public void encodePageNumberAndOffsetOffHeap() {
final TaskMemoryManager manager = new TaskMemoryManager(
new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "true")), 0);
final SparkConf conf = new SparkConf()
.set("spark.memory.offHeap.enabled", "true")
.set("spark.memory.offHeap.size", "1000");
final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryBlock dataPage = manager.allocatePage(256, null);
// In off-heap mode, an offset is an absolute address that may require more than 51 bits to
// encode. This test exercises that corner-case:
Expand All @@ -55,7 +57,7 @@ public void encodePageNumberAndOffsetOffHeap() {
@Test
public void encodePageNumberAndOffsetOnHeap() {
final TaskMemoryManager manager = new TaskMemoryManager(
new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
final MemoryBlock dataPage = manager.allocatePage(256, null);
final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, 64);
Assert.assertEquals(dataPage.getBaseObject(), manager.getPage(encodedAddress));
Expand Down Expand Up @@ -104,4 +106,15 @@ public void cooperativeSpilling() {
assert(manager.cleanUpAllAllocatedMemory() == 0);
}

@Test
public void offHeapConfigurationBackwardsCompatibility() {
// Tests backwards-compatibility with the old `spark.unsafe.offHeap` configuration, which
// was deprecated in Spark 1.6 and replaced by `spark.memory.offHeap.enabled` (see SPARK-12251).
final SparkConf conf = new SparkConf()
.set("spark.unsafe.offHeap", "true")
.set("spark.memory.offHeap.size", "1000");
final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
assert(manager.tungstenMemoryMode == MemoryMode.OFF_HEAP);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class PackedRecordPointerSuite {

@Test
public void heap() throws IOException {
final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "false");
final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryBlock page0 = memoryManager.allocatePage(128, null);
Expand All @@ -54,7 +54,9 @@ public void heap() throws IOException {

@Test
public void offHeap() throws IOException {
final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "true");
final SparkConf conf = new SparkConf()
.set("spark.memory.offHeap.enabled", "true")
.set("spark.memory.offHeap.size", "10000");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryBlock page0 = memoryManager.allocatePage(128, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class ShuffleInMemorySorterSuite {

final TestMemoryManager memoryManager =
new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"));
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false"));
final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
final TestMemoryConsumer consumer = new TestMemoryConsumer(taskMemoryManager);

Expand Down Expand Up @@ -64,7 +64,7 @@ public void testBasicSorting() throws Exception {
"Lychee",
"Mango"
};
final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "false");
final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void setUp() throws IOException {
spillFilesCreated.clear();
conf = new SparkConf()
.set("spark.buffer.pageSize", "1m")
.set("spark.unsafe.offHeap", "false");
.set("spark.memory.offHeap.enabled", "false");
taskMetrics = new TaskMetrics();
memoryManager = new TestMemoryManager(conf);
taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public void setup() {
memoryManager =
new TestMemoryManager(
new SparkConf()
.set("spark.unsafe.offHeap", "" + useOffHeapMemoryAllocator())
.set("spark.memory.offHeapSize", "256mb"));
.set("spark.memory.offHeap.enabled", "" + useOffHeapMemoryAllocator())
.set("spark.memory.offHeap.size", "256mb"));
taskMemoryManager = new TaskMemoryManager(memoryManager, 0);

tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class UnsafeExternalSorterSuite {

final LinkedList<File> spillFilesCreated = new LinkedList<File>();
final TestMemoryManager memoryManager =
new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"));
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false"));
final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
final PrefixComparator prefixComparator = new PrefixComparator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private static String getStringFromDataPage(Object baseObject, long baseOffset,
@Test
public void testSortingEmptyInput() {
final TaskMemoryManager memoryManager = new TaskMemoryManager(
new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
final UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer,
memoryManager,
Expand All @@ -71,7 +71,7 @@ public void testSortingOnlyByIntegerPrefix() throws Exception {
"Mango"
};
final TaskMemoryManager memoryManager = new TaskMemoryManager(
new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);
final Object baseObject = dataPage.getBaseObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
conf.clone
.set("spark.memory.fraction", "1")
.set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
.set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString),
.set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString),
maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
maxStorageMemory = 0,
numCores = 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
.set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
.set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString)
.set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString)
.set("spark.memory.storageFraction", storageFraction.toString)
UnifiedMemoryManager(conf, numCores = 1)
}
Expand Down
16 changes: 16 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,22 @@ Apart from these, the following properties are also available, and may be useful
<a href="tuning.html#memory-management-overview">this description</a>.
</td>
</tr>
<tr>
<td><code>spark.memory.offHeap.enabled</code></td>
<td>true</td>
<td>
If true, Spark will attempt to use off-heap memory for certain operations. If off-heap memory use is enabled, then <code>spark.memory.offHeap.size</code> must be positive.
</td>
</tr>
<tr>
<td><code>spark.memory.offHeap.size</code></td>
<td>0</td>
<td>
The absolute amount of memory which can be used for off-heap allocation.
This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly.
This must be set to a positive value when <code>spark.memory.offHeap.enabled=true</code>.
</td>
</tr>
<tr>
<td><code>spark.memory.useLegacyMode</code></td>
<td>false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,11 @@ private[joins] final class UnsafeHashedRelation(
// so that tests compile:
val taskMemoryManager = new TaskMemoryManager(
new StaticMemoryManager(
new SparkConf().set("spark.unsafe.offHeap", "false"), Long.MaxValue, Long.MaxValue, 1), 0)
new SparkConf().set("spark.memory.offHeap.enabled", "false"),
Long.MaxValue,
Long.MaxValue,
1),
0)

val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes)
.getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class UnsafeFixedWidthAggregationMapSuite
}

test(name) {
val conf = new SparkConf().set("spark.unsafe.offHeap", "false")
val conf = new SparkConf().set("spark.memory.offHeap.enabled", "false")
memoryManager = new TestMemoryManager(conf)
taskMemoryManager = new TaskMemoryManager(memoryManager, 0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
pageSize: Long,
spill: Boolean): Unit = {
val memoryManager =
new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"))
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false"))
val taskMemMgr = new TaskMemoryManager(memoryManager, 0)
TaskContext.setTaskContext(new TaskContextImpl(
stageId = 0,
Expand Down

0 comments on commit 23a9e62

Please sign in to comment.