Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix batching for nested flatmaps at different levels of composition #108

Merged
merged 2 commits into from
Sep 15, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions src/main/scala/io/getclump/ClumpContext.scala
Original file line number Diff line number Diff line change
@@ -1,31 +1,49 @@
package io.getclump

import scala.collection.immutable.SortedMap
import scala.collection.mutable
import scala.concurrent.ExecutionContext

private[getclump] final class ClumpContext {

private[this] val fetchers = new mutable.HashMap[ClumpSource[_, _], ClumpFetcher[_, _]]()
private[this] val fetchers = mutable.HashMap.empty[ClumpSource[_, _], ClumpFetcher[_, _]]

def flush(clumps: List[Clump[_]])(implicit ec: ExecutionContext): Future[Unit] = {
// 1. Get a list of all visible clumps grouped by level of composition, starting at the highest level
val upstreamByLevel = getClumpsByLevel(clumps)
// 1. Get a list of all visible clumps
val upstream = getAllUpstream(clumps)

// 2. Flush the fetches from all the visible clumps
flushFetchesInParallel(upstreamByLevel.flatten).flatMap { _ =>
flushFetchesInParallel(upstream).flatMap { _ =>
// 3. Walk through the downstream clumps as well, starting at the deepest level
flushDownstreamByLevel(upstreamByLevel.reverse)
flushDownstreamByLevel(groupClumpsByLevel(upstream))
}
}

// Unfold all visible (ie. upstream) clumps from lowest to highest level
private[this] def getClumpsByLevel(clumps: List[Clump[_]]): List[List[Clump[_]]] = {
// Unfold all visible (ie. upstream) clumps
private[this] def getAllUpstream(clumps: List[Clump[_]]): List[Clump[_]] = {
clumps match {
case Nil => Nil
case _ => clumps :: getClumpsByLevel(clumps.flatMap(_.upstream))
case _ => clumps ::: getAllUpstream(clumps.flatMap(_.upstream))
}
}

// Strip the leaves at the bottom of the clump tree one level at a time so that these two conditions are satisfied:
// - Clumps appear in later lists than all their upstream children
// - Clumps appear as early in the list as possible
private[this] def groupClumpsByLevel(clumps: List[Clump[_]]): List[List[Clump[_]]] = {
// 1. Get the longest distance from this Clump to the bottom of the tree (memoized function)
val m = mutable.HashMap.empty[Clump[_], Int]
def getDistanceFromBottom(clump: Clump[_]): Int = m.getOrElseUpdate(clump, {
clump.upstream match {
case Nil => 0
case list => list.map(getDistanceFromBottom).max + 1
}
})

// 2. Group clumps by these levels and return the deepest level first
SortedMap(clumps.groupBy(getDistanceFromBottom).toSeq:_*).values.toList
}

private[this] def flushDownstreamByLevel(levels: List[List[Clump[_]]])(implicit ec: ExecutionContext): Future[Unit] = {
levels match {
case Nil => Future.successful(())
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/io/getclump/ClumpFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import scala.concurrent.ExecutionContext

private[getclump] final class ClumpFetcher[T, U](source: ClumpSource[T, U]) {

private[this] val fetches = mutable.LinkedHashMap[T, Promise[Option[U]]]()
private[this] val fetches = mutable.LinkedHashMap.empty[T, Promise[Option[U]]]

def get(input: T): Future[Option[U]] =
synchronized {
Expand Down
17 changes: 15 additions & 2 deletions src/test/scala/io/getclump/ClumpExecutionSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,22 @@ class ClumpExecutionSpec extends Spec {
source2Fetches mustEqual List(Set(3, 4))
}

// Implementation note: this test will fail if ClumpContext::getClumpsByLevel does not satisfy the requirement that
// "Clumps appear in later lists than all their upstream children"
"for clumps created inside nested flatmaps" in new Context {
val clump1 = Clump.value(1).flatMap(source1.get(_)).flatMap(source2.get(_))
val clump2 = Clump.value(2).flatMap(source1.get(_)).flatMap(source2.get(_))
val clump1 = Clump.value(1).flatMap(source1.get).flatMap(source2.get)
val clump2 = Clump.value(2).flatMap(source1.get).flatMap(source2.get)

clumpResult(Clump.collect(clump1, clump2)) mustEqual Some(List(100, 200))
source1Fetches mustEqual List(Set(1, 2))
source2Fetches mustEqual List(Set(20, 10))
}

// Implementation note: this test will fail if ClumpContext::getClumpsByLevel does not satisfy the requirement that
// "Clumps appear as early in the list as possible"
"for clumps created inside nested flatmaps at different levels of composition" in new Context {
val clump1 = Clump.value(1).flatMap(source1.get).flatMap(source2.get).map(identity)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the .map(identity) here used to make it quit correctly batching requests after the first one. It would still get the right answer but it was doing too many calls to the source.

val clump2 = Clump.value(2).flatMap(source1.get).flatMap(source2.get)

clumpResult(Clump.collect(clump1, clump2)) mustEqual Some(List(100, 200))
source1Fetches mustEqual List(Set(1, 2))
Expand Down