Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4715][Core] Make sure tryToAcquire won't return a negative value #3575

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
val curMem = threadMemory(threadId)
val freeMemory = maxMemory - threadMemory.values.sum

// How much we can grant this thread; don't let it grow to more than 1 / numActiveThreads
val maxToGrant = math.min(numBytes, (maxMemory / numActiveThreads) - curMem)
// How much we can grant this thread; don't let it grow to more than 1 / numActiveThreads;
// don't let it be negative
val maxToGrant = math.min(numBytes, math.max(0, (maxMemory / numActiveThreads) - curMem))

if (curMem < maxMemory / (2 * numActiveThreads)) {
// We want to let each thread get at least 1 / (2 * numActiveThreads) before blocking;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class ShuffleMemoryManagerSuite extends FunSuite with Timeouts {

test("threads can block to get at least 1 / 2N memory") {
// t1 grabs 1000 bytes and then waits until t2 is ready to make a request. It sleeps
// for a bit and releases 250 bytes, which should then be greanted to t2. Further requests
// for a bit and releases 250 bytes, which should then be granted to t2. Further requests
// by t2 will return false right away because it now has 1 / 2N of the memory.

val manager = new ShuffleMemoryManager(1000L)
Expand Down Expand Up @@ -291,4 +291,19 @@ class ShuffleMemoryManagerSuite extends FunSuite with Timeouts {
assert(state.t2WaitTime > 200, s"t2 waited less than 200 ms (${state.t2WaitTime})")
}
}

test("threads should not be granted a negative size") {
val manager = new ShuffleMemoryManager(1000L)
manager.tryToAcquire(700L)

val latch = new CountDownLatch(1)
startThread("t1") {
manager.tryToAcquire(300L)
latch.countDown()
}
latch.await() // Wait until `t1` calls `tryToAcquire`

val granted = manager.tryToAcquire(300L)
assert(0 === granted, "granted is negative")
}
}