Skip to content

Commit

Permalink
[SPARK-19500] [SQL] Fix off-by-one bug in BytesToBytesMap (#53)
Browse files Browse the repository at this point in the history
Merging Spark fix.
Radix sort require that half of array as free (as temporary space), so we use 0.5 as the scale factor to make sure that BytesToBytesMap will not have more items than 1/2 of capacity. Turned out this is not true, the current implementation of append() could leave 1 more item than the threshold (1/2 of capacity) in the array, which break the requirement of radix sort (fail the assert in 2.2, or fail to insert into InMemorySorter in 2.1).

This PR fix the off-by-one bug in BytesToBytesMap.

This PR also fix a bug that the array will never grow if it fail to grow once (stay as initial capacity), introduced by apache#15722 .
Conflicts:
	core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
  • Loading branch information
rishitesh authored and ymahajan committed Jun 18, 2017
1 parent 24f8c9b commit 1a8fc0b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
if (numKeys == MAX_CAPACITY
// The map could be reused from last spill (because of no enough memory to grow),
// then we don't try to grow again if hit the `growthThreshold`.
|| !canGrowArray && numKeys > growthThreshold) {
|| !canGrowArray && numKeys >= growthThreshold) {
return false;
}

Expand Down Expand Up @@ -742,7 +742,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
longArray.set(pos * 2 + 1, keyHashcode);
isDefined = true;

if (numKeys > growthThreshold && longArray.size() < MAX_CAPACITY) {
if (numKeys >= growthThreshold && longArray.size() < MAX_CAPACITY) {
try {
growAndRehash();
} catch (OutOfMemoryError oom) {
Expand Down Expand Up @@ -911,6 +911,7 @@ public void reset() {
freePage(dataPage);
}
allocate(initialCapacity);
canGrowArray = true;
currentPage = null;
pageCursor = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,4 +342,45 @@ class UnsafeFixedWidthAggregationMapSuite
}
}

testWithMemoryLeakDetection("convert to external sorter after fail to grow (SPARK-19500)") {
val pageSize = 4096000
val map = new UnsafeFixedWidthAggregationMap(
emptyAggregationBuffer,
aggBufferSchema,
groupKeySchema,
taskMemoryManager,
128, // initial capacity
pageSize,
false // disable perf metrics
)

val rand = new Random(42)
for (i <- 1 to 63) {
val str = rand.nextString(1024)
val buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
buf.setInt(0, str.length)
}
// Simulate running out of space
memoryManager.limit(0)
var str = rand.nextString(1024)
var buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
assert(buf != null)
str = rand.nextString(1024)
buf = map.getAggregationBuffer(InternalRow(UTF8String.fromString(str)))
assert(buf == null)

// Convert the map into a sorter. This used to fail before the fix for SPARK-10474
// because we would try to acquire space for the in-memory sorter pointer array before
// actually releasing the pages despite having spilled all of them.
var sorter: UnsafeKVExternalSorter = null
try {
sorter = map.destructAndCreateExternalSorter()
map.free()
} finally {
if (sorter != null) {
sorter.cleanupResources()
}
}
}

}

0 comments on commit 1a8fc0b

Please sign in to comment.