Skip to content

Commit

Permalink
feat: Add support for for comprehensions. (#935)
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin authored Jan 13, 2024
1 parent 6883d15 commit 72f0a42
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 7 deletions.
2 changes: 0 additions & 2 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|Sink|<a name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|Sink|<a name="foreach"></a>@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.|
|Sink|<a name="foreachasync"></a>@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.|
|Sink|<a name="foreachparallel"></a>@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.|
|Sink|<a name="frommaterializer"></a>@ref[fromMaterializer](Sink/fromMaterializer.md)|Defer the creation of a `Sink` until materialization and access `Materializer` and `Attributes`|
Expand Down Expand Up @@ -446,7 +445,6 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [fold](Source-or-Flow/fold.md)
* [fold](Sink/fold.md)
* [foldAsync](Source-or-Flow/foldAsync.md)
* [foreach](Sink/foreach.md)
* [foreachAsync](Sink/foreachAsync.md)
* [foreachParallel](Sink/foreachParallel.md)
* [from](Source/from.md)
Expand Down
7 changes: 6 additions & 1 deletion project/StreamOperatorsIndexGenerator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"actorPublisher",
"addAttributes",
"mapMaterializedValue",
// for comprehensions
"withFilter",
"flatMap",
"foreach",
// *Graph:
"concatGraph",
"prependGraph",
Expand Down Expand Up @@ -108,7 +112,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"foldAsync",
"newOnCompleteStage"))

val ignore =
val ignore = {
Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++
Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++
Set(
Expand All @@ -123,6 +127,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"transformMaterializing") ++
Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") ++
Set("++", "onPush", "onPull", "actorRefWithAck")
}

def isPending(element: String, opName: String) =
pendingTestCases.get(element).exists(_.contains(opName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,16 @@ class DslConsistencySpec extends AnyWordSpec with Matchers {
"orElseGraph",
"divertToGraph")

val forComprehensions = Set("withFilter", "flatMap", "foreach")

val allowMissing: Map[Class[_], Set[String]] = Map(
jFlowClass -> graphHelpers,
jSourceClass -> (graphHelpers ++ Set("watch", "ask")),
jFlowClass -> (graphHelpers ++ forComprehensions),
jSourceClass -> (graphHelpers ++ forComprehensions ++ Set("watch", "ask")),
// Java subflows can only be nested using .via and .to (due to type system restrictions)
jSubFlowClass -> (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch", "ask")),
jSubSourceClass -> (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch", "ask")),
jSubFlowClass -> (graphHelpers ++ forComprehensions ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch",
"ask")),
jSubSourceClass -> (graphHelpers ++ forComprehensions ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow",
"watch", "ask")),
sFlowClass -> Set("of"),
sSourceClass -> Set("adapt", "from", "watch"),
sSinkClass -> Set("adapt"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.scaladsl

import org.apache.pekko
import pekko.Done
import pekko.japi.Util
import pekko.stream.testkit.StreamSpec
import pekko.stream.testkit.scaladsl.TestSink

import java.util.concurrent.CopyOnWriteArrayList
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt

class ForComprehensionsCompileSpec extends StreamSpec {
"A Source" must {
"be able to be used in a for comprehension which yield" in {
val source = Source(1 to 5)
val evenSource = for {
i <- source if i % 2 == 0
} yield i.toString
evenSource.runWith(TestSink[String]())
.request(5)
.expectNextN(List("2", "4"))
.expectComplete()
}

"be able to be used in a for comprehension which flatMap" in {
val source = Source(1 to 5)
val evenSource = for {
i <- source if i % 2 == 0
j <- Source.lazySingle(() => i)
str = j.toString
} yield str
evenSource.runWith(TestSink[String]())
.request(5)
.expectNextN(List("2", "4"))
.expectComplete()
}

"be able to be used in a for comprehension which yield a runnable graph" in {
val source = Source(1 to 5)
val list = new CopyOnWriteArrayList[String]()
val future = (for (i <- source if i % 2 == 0) {
list.add(i.toString)
}).run()

Await.result(future, 3.seconds) shouldBe Done
Util.immutableSeq(list) shouldBe List("2", "4")
}

"be able to be used in a for comprehension which with Flow" in {
(for {
i <- Source(1 to 20) if i % 2 == 0
j <- Source.lazySingle(() => i)
str = j.toString
} yield str)
.via(for {
str <- Flow[String] if str.length > 1
doubleStr = str + str
number <- Source.lazySingle(() => doubleStr)
} yield number.toInt)
.runWith(TestSink[Int]())
.request(6)
.expectNextN(List(1010, 1212, 1414, 1616, 1818, 2020))
.expectComplete()
}
}

"A Flow" must {
"be able to be used in a for comprehension which yield" in {
Source(1 to 5).via(for (i <- Flow[Int] if i % 2 == 0) yield i.toString)
.runWith(TestSink[String]())
.request(5)
.expectNextN(List("2", "4"))
.expectComplete()
}

"be able to be used in a for comprehension which flatmap" in {
Source(1 to 5).via(for {
i <- Flow[Int] if i % 2 == 0
j <- Source.single(i)
str = j.toString
} yield str)
.runWith(TestSink[String]())
.request(5)
.expectNextN(List("2", "4"))
.expectComplete()
}

"be able to be used in a for comprehension which yield a sink" in {
val source = Source(1 to 5)
val list = new CopyOnWriteArrayList[String]()
val sink = for (i <- Flow[Int] if i % 2 == 0) {
list.add(i.toString)
}
val future = source.runWith(sink)
Await.result(future, 3.seconds) shouldBe Done
Util.immutableSeq(list) shouldBe List("2", "4")
}
}
}
31 changes: 31 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,16 @@ trait FlowOps[+Out, +Mat] {
*/
def filter(p: Out => Boolean): Repr[Out] = via(Filter(p))

/**
* Alias for [[filter]], added to enable filtering in for comprehensions.
*
* NOTE: Support for `for` comprehensions is still experimental and it's possible that we might need to change
* the internal implementation.
* @since 1.1.0
*/
@ApiMayChange
def withFilter(p: Out => Boolean): Repr[Out] = filter(p)

/**
* Only pass on those elements that NOT satisfy the given predicate.
*
Expand Down Expand Up @@ -2521,6 +2531,16 @@ trait FlowOps[+Out, +Mat] {
*/
def flatMapConcat[T, M](f: Out => Graph[SourceShape[T], M]): Repr[T] = map(f).via(new FlattenMerge[T, M](1))

/**
* Alias for [[flatMapConcat]], added to enable for comprehensions.
*
* NOTE: Support for `for` comprehensions is still experimental and it's possible that we might need to change
* the internal implementation.
* @since 1.1.0
*/
@ApiMayChange
def flatMap[T, M](f: Out => Graph[SourceShape[T], M]): Repr[T] = flatMapConcat(f)

/**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by merging, where at most `breadth`
Expand Down Expand Up @@ -3732,6 +3752,17 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
*/
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) => Mat3): ClosedMat[Mat3]

/**
* Connect this [[Flow]] to a `foreach` [[Sink]], that will invoke the given procedure for each received element.
* Added to enable for comprehensions.
*
* NOTE: Support for `for` comprehensions is still experimental and it's possible that we might need to change
* the internal implementation.
* @since 1.1.0
*/
@ApiMayChange
def foreach(f: Out => Unit): ClosedMat[Future[Done]] = toMat(Sink.foreach(f))(Keep.right)

/**
* mat version of [[#flatMapPrefix]], this method gives access to a future materialized value of the downstream flow.
* see [[#flatMapPrefix]] for details.
Expand Down

0 comments on commit 72f0a42

Please sign in to comment.