From c496d03b5289f7c604661a12af86f6accddcf125 Mon Sep 17 00:00:00 2001 From: Jie Xiong Date: Wed, 7 Dec 2016 04:33:30 -0800 Subject: [PATCH] [SPARK-18208][SHUFFLE] Executor OOM due to a growing LongArray in BytesToBytesMap ## What changes were proposed in this pull request? BytesToBytesMap currently does not release the in-memory storage (the longArray variable) after it spills to disk. This is typically not a problem during aggregation because the longArray should be much smaller than the pages, and because we grow the longArray at a conservative rate. However this can lead to an OOM when an already running task is allocated more than its fair share, this can happen because of a scheduling delay. In this case the longArray can grow beyond the fair share of memory for the task. This becomes problematic when the task spills and the long array is not freed, that causes subsequent memory allocation requests to be denied by the memory manager resulting in an OOM. This PR fixes this issuing by freeing the longArray when the BytesToBytesMap spills. ## How was this patch tested? Existing tests and tested on realworld workloads. Author: Jie Xiong Author: jiexiong Closes #15722 from jiexiong/jie_oom_fix. --- .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index d2fcdea4f2cee..44120e591f2fb 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -170,6 +170,8 @@ public final class BytesToBytesMap extends MemoryConsumer { private long peakMemoryUsedBytes = 0L; + private final int initialCapacity; + private final BlockManager blockManager; private final SerializerManager serializerManager; private volatile MapIterator destructiveIterator = null; @@ -202,6 +204,7 @@ public BytesToBytesMap( throw new IllegalArgumentException("Page size " + pageSizeBytes + " cannot exceed " + TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES); } + this.initialCapacity = initialCapacity; allocate(initialCapacity); } @@ -902,12 +905,12 @@ public LongArray getArray() { public void reset() { numKeys = 0; numValues = 0; - longArray.zeroOut(); - + freeArray(longArray); while (dataPages.size() > 0) { MemoryBlock dataPage = dataPages.removeLast(); freePage(dataPage); } + allocate(initialCapacity); currentPage = null; pageCursor = 0; }