From 1a8fc0b5eaca4d0fe6cea1f35df496f2e3428021 Mon Sep 17 00:00:00 2001 From: Rishitesh Mishra Date: Fri, 2 Jun 2017 18:27:42 +0530 Subject: [PATCH] [SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMap (#53) Merging Spark fix. Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned out this is not true, the current implementation of append() could leave 1 more item than the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail the assert in 2.2, or fail to insert into InMemorySorter in 2.1). This PR fix the off-by-one bug in BytesToBytesMap. This PR also fix a bug that the array will never grow if it fail to grow once (stay as initial capacity), introduced by #15722 . Conflicts: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java --- .../spark/unsafe/map/BytesToBytesMap.java | 5 ++- .../UnsafeFixedWidthAggregationMapSuite.scala | 41 +++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 44120e591f2fb..4bef21b6b4e4d 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -698,7 +698,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff if (numKeys == MAX_CAPACITY // The map could be reused from last spill (because of no enough memory to grow), // then we don't try to grow again if hit the `growthThreshold`. - || !canGrowArray && numKeys > growthThreshold) { + || !canGrowArray && numKeys >= growthThreshold) { return false; } @@ -742,7 +742,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff longArray.set(pos * 2 + 1, keyHashcode); isDefined = true; - if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) { + if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) { try { growAndRehash(); } catch (OutOfMemoryError oom) { @@ -911,6 +911,7 @@ public void reset() { freePage(dataPage); } allocate(initialCapacity); + canGrowArray = true; currentPage = null; pageCursor = 0; } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala index c1555114e8b3e..33fa520bac983 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala @@ -342,4 +342,45 @@ class UnsafeFixedWidthAggregationMapSuite } } + testWithMemoryLeakDetection("convert to external sorter after fail to grow (SPARK-19500)") { + val pageSize = 4096000 + val map = new UnsafeFixedWidthAggregationMap( + emptyAggregationBuffer, + aggBufferSchema, + groupKeySchema, + taskMemoryManager, + 128, // initial capacity + pageSize, + false // disable perf metrics + ) + + val rand = new Random(42) + for (i <- 1 to 63) { + val str = rand.nextString(1024) + val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str))) + buf.setInt(0, str.length) + } + // Simulate running out of space + memoryManager.limit(0) + var str = rand.nextString(1024) + var buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str))) + assert(buf != null) + str = rand.nextString(1024) + buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str))) + assert(buf == null) + + // Convert the map into a sorter. This used to fail before the fix for SPARK-10474 + // because we would try to acquire space for the in-memory sorter pointer array before + // actually releasing the pages despite having spilled all of them. + var sorter: UnsafeKVExternalSorter = null + try { + sorter = map.destructAndCreateExternalSorter() + map.free() + } finally { + if (sorter != null) { + sorter.cleanupResources() + } + } + } + }