From 1e2e128fbf5bda75d3458d0a343ebf9f524c162e Mon Sep 17 00:00:00 2001 From: calvinlfer Date: Sat, 13 Nov 2021 10:47:56 -0500 Subject: [PATCH 1/2] Update FS2 integration logic and make optimizations to avoid re-chunking the FS2 stream & matching on each subtype of the fs2.Chunk to attempt conversion to a proper ZIO counterpart --- build.sbt | 2 +- .../zio/stream/interop/FS2StreamSyntax.scala | 94 ++++++++++++------- 2 files changed, 62 insertions(+), 34 deletions(-) diff --git a/build.sbt b/build.sbt index f4ede87e..2814f409 100644 --- a/build.sbt +++ b/build.sbt @@ -52,7 +52,7 @@ val catsVersion = "2.6.1" val catsEffectVersion = "3.2.9" val catsMtlVersion = "1.2.1" val disciplineScalaTestVersion = "2.1.5" -val fs2Version = "3.0.6" +val fs2Version = "3.2.2" val scalaJavaTimeVersion = "2.3.0" lazy val zioInteropCats = crossProject(JSPlatform, JVMPlatform) diff --git a/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala b/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala index b28d7973..58cee1a2 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala @@ -2,7 +2,7 @@ package zio package stream.interop import fs2.Stream -import zio.interop.catz.{ concurrentInstance, zManagedSyntax, zioResourceSyntax } +import zio.interop.catz.{ concurrentInstance, zManagedSyntax } import zio.stream.{ Take, ZStream } import scala.language.implicitConversions @@ -40,37 +40,65 @@ final class FS2RIOStreamSyntax[R, A](private val stream: Stream[RIO[R, _], A]) { def toZStream[R1 <: R](queueSize: Int = 16): ZStream[R1, Throwable, A] = if (queueSize > 1) toZStreamChunk(queueSize) else toZStreamSingle - private def toZStreamSingle[R1 <: R]: ZStream[R1, Throwable, A] = - ZStream.managed { - for { - queue <- Queue.bounded[Take[Throwable, A]](1).toManaged[R](_.shutdown) - _ <- - (stream.evalTap(a => queue.offer(Take.single(a))) ++ fs2.Stream - .eval(queue.offer(Take.end))) - .handleErrorWith(e => fs2.Stream.eval(queue.offer(Take.fail(e))).drain) - .compile[RIO[R, _], RIO[R, _], Any] - .resource - .drain - .toManagedZIO - .fork - } yield ZStream.fromQueue(queue).flattenTake - }.flatten - - private def toZStreamChunk[R1 <: R](queueSize: Int): ZStream[R1, Throwable, A] = - ZStream.managed { - for { - queue <- Queue.bounded[Take[Throwable, A]](queueSize).toManaged(_.shutdown) - _ <- { - stream - .chunkLimit(queueSize) - .evalTap(a => queue.offer(Take.chunk(zio.Chunk.fromIterable(a.toList)))) - .unchunk ++ fs2.Stream.eval(queue.offer(Take.end)) - }.handleErrorWith(e => fs2.Stream.eval(queue.offer(Take.fail(e))).drain) - .compile[RIO[R, _], RIO[R, _], Any] - .resource + def toZStreamChunk[R1 <: R](queueSize: Int = 16): ZStream[R1, Throwable, A] = { + def integrate(stream: fs2.Stream[RIO[R, _], A], zQueue: Queue[Take[Throwable, A]]): fs2.Pull[RIO[R, _], A, Unit] = + stream.pull.uncons.flatMap { + case None => + fs2.Pull.eval(zQueue.offer(Take.end)) >> fs2.Pull.done + + case Some((fs2Chunk: fs2.Chunk[A], stream)) => + val offer = zQueue.offer(Take.chunk(toZioChunk(fs2Chunk))) + fs2.Pull.eval(offer) >> integrate(stream, zQueue) + } + + convert(queueSize, integrate) + } + + private def toZStreamSingle[R1 <: R]: ZStream[R1, Throwable, A] = { + def integrate(stream: fs2.Stream[RIO[R, _], A], zQueue: Queue[Take[Throwable, A]]): fs2.Pull[RIO[R, _], A, Unit] = + stream.pull.uncons.flatMap { + case None => + fs2.Pull.eval(zQueue.offer(Take.end)) >> fs2.Pull.done + + case Some((elements, stream)) => + val offer = zQueue.offerAll(toZioChunk(elements).map(Take.single)) + fs2.Pull.eval(offer) >> integrate(stream, zQueue) + } + + convert(1, integrate) + } + + private def convert( + queueSize: Int, + integrate: (fs2.Stream[RIO[R, _], A], Queue[Take[Throwable, A]]) => fs2.Pull[RIO[R, _], A, Unit] + ) = + ZStream.fromEffect(Queue.bounded[Take[Throwable, A]](queueSize)).flatMap { q => + val toQueue = ZStream.fromEffect { + integrate(stream, q).stream + .handleErrorWith(e => fs2.Stream.eval(q.offer(Take.fail(e)))) + .compile .drain - .toManagedZIO - .fork - } yield ZStream.fromQueue(queue).flattenTake - }.flatten + } + + val fromQueue = ZStream.fromQueue(q).flattenTake + fromQueue.drainFork(toQueue) + } + + private def toZioChunk(in: fs2.Chunk[A]): Chunk[A] = + in match { + case fs2.Chunk.ArraySlice(values, _, _) => + Chunk.fromArray(values) + + case buffer: fs2.Chunk.Buffer[_, _, _] => + Chunk.fromIterator(buffer.iterator) + + case queue: fs2.Chunk.Queue[a] => + Chunk.fromIterator(queue.iterator) + + case singleton: fs2.Chunk.Singleton[a] => + Chunk.single(singleton.value) + + case unknown => + Chunk.fromIterable(unknown.toList) + } } From b76bd131eccaa171fc65491517c576c10a53c3cc Mon Sep 17 00:00:00 2001 From: calvinlfer Date: Sat, 13 Nov 2021 14:32:14 -0500 Subject: [PATCH 2/2] Update FS2 version to the latest built with Scala 3.0.1 and merge implementations of queue sizes --- build.sbt | 2 +- .../zio/stream/interop/FS2StreamSyntax.scala | 40 ++++--------------- 2 files changed, 8 insertions(+), 34 deletions(-) diff --git a/build.sbt b/build.sbt index 2814f409..c75f80b3 100644 --- a/build.sbt +++ b/build.sbt @@ -52,7 +52,7 @@ val catsVersion = "2.6.1" val catsEffectVersion = "3.2.9" val catsMtlVersion = "1.2.1" val disciplineScalaTestVersion = "2.1.5" -val fs2Version = "3.2.2" +val fs2Version = "3.1.6" val scalaJavaTimeVersion = "2.3.0" lazy val zioInteropCats = crossProject(JSPlatform, JVMPlatform) diff --git a/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala b/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala index 58cee1a2..c69e1d48 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala @@ -1,7 +1,7 @@ package zio package stream.interop -import fs2.Stream +import fs2.{ Pull, Stream } import zio.interop.catz.{ concurrentInstance, zManagedSyntax } import zio.stream.{ Take, ZStream } @@ -38,10 +38,10 @@ final class FS2RIOStreamSyntax[R, A](private val stream: Stream[RIO[R, _], A]) { * @note when possible use only power of 2 queue sizes; this will provide better performance of the queue. */ def toZStream[R1 <: R](queueSize: Int = 16): ZStream[R1, Throwable, A] = - if (queueSize > 1) toZStreamChunk(queueSize) else toZStreamSingle + if (queueSize > 1) toZStreamChunk(queueSize) else toZStreamChunk(queueSize = 1) def toZStreamChunk[R1 <: R](queueSize: Int = 16): ZStream[R1, Throwable, A] = { - def integrate(stream: fs2.Stream[RIO[R, _], A], zQueue: Queue[Take[Throwable, A]]): fs2.Pull[RIO[R, _], A, Unit] = + def integrate(stream: Stream[RIO[R, _], A], zQueue: Queue[Take[Throwable, A]]): Pull[RIO[R, _], A, Unit] = stream.pull.uncons.flatMap { case None => fs2.Pull.eval(zQueue.offer(Take.end)) >> fs2.Pull.done @@ -51,31 +51,10 @@ final class FS2RIOStreamSyntax[R, A](private val stream: Stream[RIO[R, _], A]) { fs2.Pull.eval(offer) >> integrate(stream, zQueue) } - convert(queueSize, integrate) - } - - private def toZStreamSingle[R1 <: R]: ZStream[R1, Throwable, A] = { - def integrate(stream: fs2.Stream[RIO[R, _], A], zQueue: Queue[Take[Throwable, A]]): fs2.Pull[RIO[R, _], A, Unit] = - stream.pull.uncons.flatMap { - case None => - fs2.Pull.eval(zQueue.offer(Take.end)) >> fs2.Pull.done - - case Some((elements, stream)) => - val offer = zQueue.offerAll(toZioChunk(elements).map(Take.single)) - fs2.Pull.eval(offer) >> integrate(stream, zQueue) - } - - convert(1, integrate) - } - - private def convert( - queueSize: Int, - integrate: (fs2.Stream[RIO[R, _], A], Queue[Take[Throwable, A]]) => fs2.Pull[RIO[R, _], A, Unit] - ) = ZStream.fromEffect(Queue.bounded[Take[Throwable, A]](queueSize)).flatMap { q => val toQueue = ZStream.fromEffect { integrate(stream, q).stream - .handleErrorWith(e => fs2.Stream.eval(q.offer(Take.fail(e)))) + .handleErrorWith(e => Stream.eval(q.offer(Take.fail(e)))) .compile .drain } @@ -83,22 +62,17 @@ final class FS2RIOStreamSyntax[R, A](private val stream: Stream[RIO[R, _], A]) { val fromQueue = ZStream.fromQueue(q).flattenTake fromQueue.drainFork(toQueue) } + } private def toZioChunk(in: fs2.Chunk[A]): Chunk[A] = in match { case fs2.Chunk.ArraySlice(values, _, _) => Chunk.fromArray(values) - case buffer: fs2.Chunk.Buffer[_, _, _] => - Chunk.fromIterator(buffer.iterator) - - case queue: fs2.Chunk.Queue[a] => - Chunk.fromIterator(queue.iterator) - case singleton: fs2.Chunk.Singleton[a] => Chunk.single(singleton.value) - case unknown => - Chunk.fromIterable(unknown.toList) + case other => + Chunk.fromIterator(other.iterator) } }