Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Monoid constraint by CommutativeSemigroup in the reduce syntax #203

Merged
merged 11 commits into from
Jan 16, 2018
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
val sparkVersion = "2.2.0"
val catsCoreVersion = "1.0.0-MF"
val catsEffectVersion = "0.4"
val catsMtlVersion = "0.0.2"
val catsCoreVersion = "1.0.1"
val catsEffectVersion = "0.7"
val catsMtlVersion = "0.2.2"
val scalatest = "3.0.3"
val shapeless = "2.3.2"
val scalacheck = "1.13.5"
Expand All @@ -27,11 +27,12 @@ lazy val cats = project
scalacOptions += "-Ypartial-unification"
)
.settings(libraryDependencies ++= Seq(
"org.typelevel" %% "cats-core" % catsCoreVersion,
"org.typelevel" %% "cats-effect" % catsEffectVersion,
"org.typelevel" %% "cats-mtl-core" % catsMtlVersion,
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided"))
"org.typelevel" %% "cats-core" % catsCoreVersion,
"org.typelevel" %% "cats-effect" % catsEffectVersion,
"org.typelevel" %% "cats-mtl-core" % catsMtlVersion,
"org.typelevel" %% "alleycats-core" % catsCoreVersion,
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided"))
.dependsOn(dataset % "test->test;compile->compile")

lazy val dataset = project
Expand Down
34 changes: 29 additions & 5 deletions cats/src/main/scala/frameless/cats/implicits.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,45 @@
package frameless
package cats

import _root_.cats.implicits._
import _root_.cats._
import _root_.cats.kernel.{CommutativeMonoid, CommutativeSemigroup}
import _root_.cats.implicits._
import alleycats.Empty

import scala.reflect.ClassTag
import org.apache.spark.rdd.RDD

object implicits extends FramelessSyntax with SparkDelayInstances {
implicit class rddOps[A: ClassTag](lhs: RDD[A]) {
def csum(implicit m: Monoid[A]): A = lhs.reduce(_ |+| _)
def cmin(implicit o: Order[A]): A = lhs.reduce(_ min _)
def cmax(implicit o: Order[A]): A = lhs.reduce(_ max _)
def csum(implicit m: CommutativeMonoid[A]): A =
lhs.fold(m.empty)(_ |+| _)
def csumOption(implicit m: CommutativeSemigroup[A]): Option[A] =
lhs.aggregate[Option[A]](None)(
(acc, a) => Some(acc.fold(a)(_ |+| a)),
(l, r) => l.fold(r)(x => r.map(_ |+| x) <+> Some(x))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be reading this wrong, but this seems incorrect to me. Isn't x getting added in twice in the case where both l and r are Some?

If so, it's concerning that unit tests didn't catch this. Maybe some places that are calling toRDD on a List should be calling parallelize?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should probably replace the <+> operator by orElse since I'm basically relying on the fact that the implementation of <+> for Option is that one.

In essence, I just want to express the Alternative between the two Option (<|>) but in cats this is implemented based on MonoidK, and therefore <|> becomes <+>...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right that makes sense. I think that I'm one of the people who advocated for doing this in Cats, so I probably should have realized what was going on here :P

It may be more straightforward to just use orElse, but if it works the right way then my main concern is gone. Thanks for the explanation!

)

def cmin(implicit o: Order[A], e: Empty[A]): A = {
if (lhs.isEmpty) e.empty
else lhs.reduce(_ min _)
}
def cminOption(implicit o: Order[A]): Option[A] =
csumOption(new CommutativeSemigroup[A] {
def combine(l: A, r: A) = l min r
})

def cmax(implicit o: Order[A], e: Empty[A]): A = {
if (lhs.isEmpty) e.empty
else lhs.reduce(_ max _)
}
def cmaxOption(implicit o: Order[A]): Option[A] =
csumOption(new CommutativeSemigroup[A] {
def combine(l: A, r: A) = l max r
})
}

implicit class pairRddOps[K: ClassTag, V: ClassTag](lhs: RDD[(K, V)]) {
def csumByKey(implicit m: Monoid[V]): RDD[(K, V)] = lhs.reduceByKey(_ |+| _)
def csumByKey(implicit m: CommutativeSemigroup[V]): RDD[(K, V)] = lhs.reduceByKey(_ |+| _)
def cminByKey(implicit o: Order[V]): RDD[(K, V)] = lhs.reduceByKey(_ min _)
def cmaxByKey(implicit o: Order[V]): RDD[(K, V)] = lhs.reduceByKey(_ max _)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class FramelessSyntaxTests extends TypedDatasetSuite {
def pure[A](x: A): ReaderT[IO, SparkSession, A] = ReaderT.pure(x)
def handleErrorWith[A](fa: ReaderT[IO, SparkSession, A])(f: Throwable => ReaderT[IO, SparkSession, A]): ReaderT[IO, SparkSession, A] =
ReaderT(r => fa.run(r).handleErrorWith(e => f(e).run(r)))
def raiseError[A](e: Throwable): ReaderT[IO, SparkSession, A] = ReaderT.lift(IO.raiseError(e))
def raiseError[A](e: Throwable): ReaderT[IO, SparkSession, A] = ReaderT.liftF(IO.raiseError(e))
def flatMap[A, B](fa: ReaderT[IO, SparkSession, A])(f: A => ReaderT[IO, SparkSession, B]): ReaderT[IO, SparkSession, B] = fa.flatMap(f)
def tailRecM[A, B](a: A)(f: A => ReaderT[IO, SparkSession, Either[A, B]]): ReaderT[IO, SparkSession, B] =
ReaderT.catsDataMonadForKleisli[IO, SparkSession].tailRecM(a)(f)
Expand Down
65 changes: 48 additions & 17 deletions cats/src/test/scala/frameless/cats/test.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package frameless
package cats

import _root_.cats.Foldable
import _root_.cats.implicits._

import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.scalatest.Matchers
import org.scalacheck.Arbitrary
import Arbitrary._
import org.apache.spark.rdd.RDD
import org.scalatest._
import prop._
import org.apache.spark.{SparkConf, SparkContext => SC}

import org.scalatest.compatible.Assertion
import org.scalactic.anyvals.PosInt
import org.scalatest.Matchers
import org.scalacheck.Arbitrary
import org.scalatest._
import Arbitrary._
import prop._

import scala.collection.immutable.SortedMap
import scala.reflect.ClassTag

trait SparkTests {
Expand Down Expand Up @@ -74,25 +78,52 @@ class Test extends PropSpec with Matchers with PropertyChecks with SparkTests {
}
}

property("rdd simple numeric monoid example") {
property("rdd simple numeric commutative semigroup") {
import frameless.cats.implicits._
val theSeq = 1 to 20
val toy = theSeq.toRdd
toy.cmin shouldBe 1
toy.cmax shouldBe 20
toy.csum shouldBe theSeq.sum

forAll { seq: List[Int] =>
val expectedSum = if (seq.isEmpty) None else Some(seq.sum)
val expectedMin = if (seq.isEmpty) None else Some(seq.min)
val expectedMax = if (seq.isEmpty) None else Some(seq.max)

val rdd = seq.toRdd

rdd.cmin shouldBe expectedMin.getOrElse(0)
rdd.cminOption shouldBe expectedMin

rdd.cmax shouldBe expectedMax.getOrElse(0)
rdd.cmaxOption shouldBe expectedMax

rdd.csum shouldBe expectedSum.getOrElse(0)
rdd.csumOption shouldBe expectedSum
}
}

property("rdd Map[Int,Int] monoid example") {
property("rdd of SortedMap[Int,Int] commutative monoid") {
import frameless.cats.implicits._
val rdd: RDD[Map[Int, Int]] = 1.to(20).zip(1.to(20)).toRdd.map(Map(_))
rdd.csum shouldBe 1.to(20).zip(1.to(20)).toMap
forAll { seq: List[SortedMap[Int, Int]] =>
val rdd = seq.toRdd
rdd.csum shouldBe Foldable[List].fold(seq)
}
}

property("rdd tuple commutative semigroup example") {
import frameless.cats.implicits._
forAll { seq: List[(Int, Int)] =>
val expectedSum = if (seq.isEmpty) None else Some(Foldable[List].fold(seq))
val rdd = seq.toRdd

rdd.csum shouldBe expectedSum.getOrElse(0 -> 0)
rdd.csumOption shouldBe expectedSum
}
}

property("rdd tuple monoid example") {
property("pair rdd numeric commutative semigroup example") {
import frameless.cats.implicits._
val seq = Seq( (1,2), (2,3), (5,6) )
val seq = Seq( ("a",2), ("b",3), ("d",6), ("b",2), ("d",1) )
val rdd = seq.toRdd
rdd.csum shouldBe seq.reduce(_|+|_)
rdd.cminByKey.collect.toSeq should contain theSameElementsAs Seq( ("a",2), ("b",2), ("d",1) )
rdd.cmaxByKey.collect.toSeq should contain theSameElementsAs Seq( ("a",2), ("b",3), ("d",6) )
rdd.csumByKey.collect.toSeq should contain theSameElementsAs Seq( ("a",2), ("b",5), ("d",7) )
}
}
25 changes: 21 additions & 4 deletions docs/src/main/tut/Cats.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ implicit val sync: Sync[ReaderT[IO, SparkSession, ?]] = new Sync[ReaderT[IO, Spa
def pure[A](x: A): ReaderT[IO, SparkSession, A] = ReaderT.pure(x)
def handleErrorWith[A](fa: ReaderT[IO, SparkSession, A])(f: Throwable => ReaderT[IO, SparkSession, A]): ReaderT[IO, SparkSession, A] =
ReaderT(r => fa.run(r).handleErrorWith(e => f(e).run(r)))
def raiseError[A](e: Throwable): ReaderT[IO, SparkSession, A] = ReaderT.lift(IO.raiseError(e))
def raiseError[A](e: Throwable): ReaderT[IO, SparkSession, A] = ReaderT.liftF(IO.raiseError(e))
def flatMap[A, B](fa: ReaderT[IO, SparkSession, A])(f: A => ReaderT[IO, SparkSession, B]): ReaderT[IO, SparkSession, B] = fa.flatMap(f)
def tailRecM[A, B](a: A)(f: A => ReaderT[IO, SparkSession, Either[A, B]]): ReaderT[IO, SparkSession, B] =
ReaderT.catsDataMonadForKleisli[IO, SparkSession].tailRecM(a)(f)
Expand Down Expand Up @@ -101,7 +101,7 @@ And now, we can set the description for the computation being run:
val resultWithDescription: Action[(Seq[(Int, String)], Long)] = for {
r <- result.withDescription("fancy cats")
session <- ReaderT.ask[IO, SparkSession]
_ <- ReaderT.lift {
_ <- ReaderT.liftF {
IO {
println(s"Description: ${session.sparkContext.getLocalProperty("spark.job.description")}")
}
Expand Down Expand Up @@ -131,6 +131,21 @@ println(data.cmax)
println(data.cmin)
```

In case the RDD is empty, the `csum`, `cmax` and `cmin` will use the default values for the type of
elements inside the RDD. There are counterpart operations to those that have an `Option` return type
to deal with the case of an empty RDD:

```tut:book
val data: RDD[(Int, Int, Int)] = sc.emptyRDD

println(data.csum)
println(data.csumOption)
println(data.cmax)
println(data.cmaxOption)
println(data.cmin)
println(data.cminOption)
```

The following example aggregates all the elements with a common key.

```tut:book
Expand All @@ -148,9 +163,11 @@ totalPerUser.collectAsMap
The same example would work for more complex keys.

```tut:book
import scala.collection.immutable.SortedMap

val allDataComplexKeu =
sc.makeRDD( ("Bob", Map("task1" -> 10)) ::
("Joe", Map("task1" -> 1, "task2" -> 3)) :: ("Bob", Map("task1" -> 10, "task2" -> 1)) :: ("Joe", Map("task3" -> 4)) :: Nil )
sc.makeRDD( ("Bob", SortedMap("task1" -> 10)) ::
("Joe", SortedMap("task1" -> 1, "task2" -> 3)) :: ("Bob", SortedMap("task1" -> 10, "task2" -> 1)) :: ("Joe", SortedMap("task3" -> 4)) :: Nil )

val overalTasksPerUser = allDataComplexKeu.csumByKey

Expand Down