From 72f0a426b8fe5e8924ffa6bf4943ea6fbe16654c Mon Sep 17 00:00:00 2001 From: kerr Date: Sun, 14 Jan 2024 00:39:44 +0800 Subject: [PATCH] feat: Add support for `for comprehensions`. (#935) --- .../main/paradox/stream/operators/index.md | 2 - project/StreamOperatorsIndexGenerator.scala | 7 +- .../pekko/stream/DslConsistencySpec.scala | 12 +- .../ForComprehensionsCompileSpec.scala | 117 ++++++++++++++++++ .../apache/pekko/stream/scaladsl/Flow.scala | 31 +++++ 5 files changed, 162 insertions(+), 7 deletions(-) create mode 100644 stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ForComprehensionsCompileSpec.scala diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index d797f53942d..ae0d0c40284 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -61,7 +61,6 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl |Sink|@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy| |Sink|@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. | |Sink|@ref[fold](Sink/fold.md)|Fold over emitted element with a function, where each invocation will get the new element and the result from the previous fold invocation.| -|Sink|@ref[foreach](Sink/foreach.md)|Invoke a given procedure for each element received.| |Sink|@ref[foreachAsync](Sink/foreachAsync.md)|Invoke a given procedure asynchronously for each element received.| |Sink|@ref[foreachParallel](Sink/foreachParallel.md)|Like `foreach` but allows up to `parallellism` procedure calls to happen in parallel.| |Sink|@ref[fromMaterializer](Sink/fromMaterializer.md)|Defer the creation of a `Sink` until materialization and access `Materializer` and `Attributes`| @@ -446,7 +445,6 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [fold](Source-or-Flow/fold.md) * [fold](Sink/fold.md) * [foldAsync](Source-or-Flow/foldAsync.md) -* [foreach](Sink/foreach.md) * [foreachAsync](Sink/foreachAsync.md) * [foreachParallel](Sink/foreachParallel.md) * [from](Source/from.md) diff --git a/project/StreamOperatorsIndexGenerator.scala b/project/StreamOperatorsIndexGenerator.scala index 1c4974ef318..31c9c309ce5 100644 --- a/project/StreamOperatorsIndexGenerator.scala +++ b/project/StreamOperatorsIndexGenerator.scala @@ -67,6 +67,10 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "actorPublisher", "addAttributes", "mapMaterializedValue", + // for comprehensions + "withFilter", + "flatMap", + "foreach", // *Graph: "concatGraph", "prependGraph", @@ -108,7 +112,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "foldAsync", "newOnCompleteStage")) - val ignore = + val ignore = { Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++ Set("productArity", "canEqual", "productPrefix", "copy", "productIterator", "productElement") ++ Set( @@ -123,6 +127,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "transformMaterializing") ++ Set("asScala", "asJava", "deprecatedAndThen", "deprecatedAndThenMat") ++ Set("++", "onPush", "onPull", "actorRefWithAck") + } def isPending(element: String, opName: String) = pendingTestCases.get(element).exists(_.contains(opName)) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala index 557609e2886..d3188f9053b 100755 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala @@ -91,12 +91,16 @@ class DslConsistencySpec extends AnyWordSpec with Matchers { "orElseGraph", "divertToGraph") + val forComprehensions = Set("withFilter", "flatMap", "foreach") + val allowMissing: Map[Class[_], Set[String]] = Map( - jFlowClass -> graphHelpers, - jSourceClass -> (graphHelpers ++ Set("watch", "ask")), + jFlowClass -> (graphHelpers ++ forComprehensions), + jSourceClass -> (graphHelpers ++ forComprehensions ++ Set("watch", "ask")), // Java subflows can only be nested using .via and .to (due to type system restrictions) - jSubFlowClass -> (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch", "ask")), - jSubSourceClass -> (graphHelpers ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch", "ask")), + jSubFlowClass -> (graphHelpers ++ forComprehensions ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", "watch", + "ask")), + jSubSourceClass -> (graphHelpers ++ forComprehensions ++ Set("groupBy", "splitAfter", "splitWhen", "subFlow", + "watch", "ask")), sFlowClass -> Set("of"), sSourceClass -> Set("adapt", "from", "watch"), sSinkClass -> Set("adapt"), diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ForComprehensionsCompileSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ForComprehensionsCompileSpec.scala new file mode 100644 index 00000000000..4106282c6b4 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/ForComprehensionsCompileSpec.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko +import pekko.Done +import pekko.japi.Util +import pekko.stream.testkit.StreamSpec +import pekko.stream.testkit.scaladsl.TestSink + +import java.util.concurrent.CopyOnWriteArrayList +import scala.concurrent.Await +import scala.concurrent.duration.DurationInt + +class ForComprehensionsCompileSpec extends StreamSpec { + "A Source" must { + "be able to be used in a for comprehension which yield" in { + val source = Source(1 to 5) + val evenSource = for { + i <- source if i % 2 == 0 + } yield i.toString + evenSource.runWith(TestSink[String]()) + .request(5) + .expectNextN(List("2", "4")) + .expectComplete() + } + + "be able to be used in a for comprehension which flatMap" in { + val source = Source(1 to 5) + val evenSource = for { + i <- source if i % 2 == 0 + j <- Source.lazySingle(() => i) + str = j.toString + } yield str + evenSource.runWith(TestSink[String]()) + .request(5) + .expectNextN(List("2", "4")) + .expectComplete() + } + + "be able to be used in a for comprehension which yield a runnable graph" in { + val source = Source(1 to 5) + val list = new CopyOnWriteArrayList[String]() + val future = (for (i <- source if i % 2 == 0) { + list.add(i.toString) + }).run() + + Await.result(future, 3.seconds) shouldBe Done + Util.immutableSeq(list) shouldBe List("2", "4") + } + + "be able to be used in a for comprehension which with Flow" in { + (for { + i <- Source(1 to 20) if i % 2 == 0 + j <- Source.lazySingle(() => i) + str = j.toString + } yield str) + .via(for { + str <- Flow[String] if str.length > 1 + doubleStr = str + str + number <- Source.lazySingle(() => doubleStr) + } yield number.toInt) + .runWith(TestSink[Int]()) + .request(6) + .expectNextN(List(1010, 1212, 1414, 1616, 1818, 2020)) + .expectComplete() + } + } + + "A Flow" must { + "be able to be used in a for comprehension which yield" in { + Source(1 to 5).via(for (i <- Flow[Int] if i % 2 == 0) yield i.toString) + .runWith(TestSink[String]()) + .request(5) + .expectNextN(List("2", "4")) + .expectComplete() + } + + "be able to be used in a for comprehension which flatmap" in { + Source(1 to 5).via(for { + i <- Flow[Int] if i % 2 == 0 + j <- Source.single(i) + str = j.toString + } yield str) + .runWith(TestSink[String]()) + .request(5) + .expectNextN(List("2", "4")) + .expectComplete() + } + + "be able to be used in a for comprehension which yield a sink" in { + val source = Source(1 to 5) + val list = new CopyOnWriteArrayList[String]() + val sink = for (i <- Flow[Int] if i % 2 == 0) { + list.add(i.toString) + } + val future = source.runWith(sink) + Await.result(future, 3.seconds) shouldBe Done + Util.immutableSeq(list) shouldBe List("2", "4") + } + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 9452ffc972e..6bb973aff54 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1383,6 +1383,16 @@ trait FlowOps[+Out, +Mat] { */ def filter(p: Out => Boolean): Repr[Out] = via(Filter(p)) + /** + * Alias for [[filter]], added to enable filtering in for comprehensions. + * + * NOTE: Support for `for` comprehensions is still experimental and it's possible that we might need to change + * the internal implementation. + * @since 1.1.0 + */ + @ApiMayChange + def withFilter(p: Out => Boolean): Repr[Out] = filter(p) + /** * Only pass on those elements that NOT satisfy the given predicate. * @@ -2521,6 +2531,16 @@ trait FlowOps[+Out, +Mat] { */ def flatMapConcat[T, M](f: Out => Graph[SourceShape[T], M]): Repr[T] = map(f).via(new FlattenMerge[T, M](1)) + /** + * Alias for [[flatMapConcat]], added to enable for comprehensions. + * + * NOTE: Support for `for` comprehensions is still experimental and it's possible that we might need to change + * the internal implementation. + * @since 1.1.0 + */ + @ApiMayChange + def flatMap[T, M](f: Out => Graph[SourceShape[T], M]): Repr[T] = flatMapConcat(f) + /** * Transform each input element into a `Source` of output elements that is * then flattened into the output stream by merging, where at most `breadth` @@ -3732,6 +3752,17 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { */ def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) => Mat3): ClosedMat[Mat3] + /** + * Connect this [[Flow]] to a `foreach` [[Sink]], that will invoke the given procedure for each received element. + * Added to enable for comprehensions. + * + * NOTE: Support for `for` comprehensions is still experimental and it's possible that we might need to change + * the internal implementation. + * @since 1.1.0 + */ + @ApiMayChange + def foreach(f: Out => Unit): ClosedMat[Future[Done]] = toMat(Sink.foreach(f))(Keep.right) + /** * mat version of [[#flatMapPrefix]], this method gives access to a future materialized value of the downstream flow. * see [[#flatMapPrefix]] for details.