diff --git a/build.sbt b/build.sbt index f4ede87e..c75f80b3 100644 --- a/build.sbt +++ b/build.sbt @@ -52,7 +52,7 @@ val catsVersion = "2.6.1" val catsEffectVersion = "3.2.9" val catsMtlVersion = "1.2.1" val disciplineScalaTestVersion = "2.1.5" -val fs2Version = "3.0.6" +val fs2Version = "3.1.6" val scalaJavaTimeVersion = "2.3.0" lazy val zioInteropCats = crossProject(JSPlatform, JVMPlatform) diff --git a/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala b/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala index b28d7973..c69e1d48 100644 --- a/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala +++ b/zio-interop-cats/shared/src/main/scala/zio/stream/interop/FS2StreamSyntax.scala @@ -1,8 +1,8 @@ package zio package stream.interop -import fs2.Stream -import zio.interop.catz.{ concurrentInstance, zManagedSyntax, zioResourceSyntax } +import fs2.{ Pull, Stream } +import zio.interop.catz.{ concurrentInstance, zManagedSyntax } import zio.stream.{ Take, ZStream } import scala.language.implicitConversions @@ -38,39 +38,41 @@ final class FS2RIOStreamSyntax[R, A](private val stream: Stream[RIO[R, _], A]) { * @note when possible use only power of 2 queue sizes; this will provide better performance of the queue. */ def toZStream[R1 <: R](queueSize: Int = 16): ZStream[R1, Throwable, A] = - if (queueSize > 1) toZStreamChunk(queueSize) else toZStreamSingle - - private def toZStreamSingle[R1 <: R]: ZStream[R1, Throwable, A] = - ZStream.managed { - for { - queue <- Queue.bounded[Take[Throwable, A]](1).toManaged[R](_.shutdown) - _ <- - (stream.evalTap(a => queue.offer(Take.single(a))) ++ fs2.Stream - .eval(queue.offer(Take.end))) - .handleErrorWith(e => fs2.Stream.eval(queue.offer(Take.fail(e))).drain) - .compile[RIO[R, _], RIO[R, _], Any] - .resource - .drain - .toManagedZIO - .fork - } yield ZStream.fromQueue(queue).flattenTake - }.flatten - - private def toZStreamChunk[R1 <: R](queueSize: Int): ZStream[R1, Throwable, A] = - ZStream.managed { - for { - queue <- Queue.bounded[Take[Throwable, A]](queueSize).toManaged(_.shutdown) - _ <- { - stream - .chunkLimit(queueSize) - .evalTap(a => queue.offer(Take.chunk(zio.Chunk.fromIterable(a.toList)))) - .unchunk ++ fs2.Stream.eval(queue.offer(Take.end)) - }.handleErrorWith(e => fs2.Stream.eval(queue.offer(Take.fail(e))).drain) - .compile[RIO[R, _], RIO[R, _], Any] - .resource + if (queueSize > 1) toZStreamChunk(queueSize) else toZStreamChunk(queueSize = 1) + + def toZStreamChunk[R1 <: R](queueSize: Int = 16): ZStream[R1, Throwable, A] = { + def integrate(stream: Stream[RIO[R, _], A], zQueue: Queue[Take[Throwable, A]]): Pull[RIO[R, _], A, Unit] = + stream.pull.uncons.flatMap { + case None => + fs2.Pull.eval(zQueue.offer(Take.end)) >> fs2.Pull.done + + case Some((fs2Chunk: fs2.Chunk[A], stream)) => + val offer = zQueue.offer(Take.chunk(toZioChunk(fs2Chunk))) + fs2.Pull.eval(offer) >> integrate(stream, zQueue) + } + + ZStream.fromEffect(Queue.bounded[Take[Throwable, A]](queueSize)).flatMap { q => + val toQueue = ZStream.fromEffect { + integrate(stream, q).stream + .handleErrorWith(e => Stream.eval(q.offer(Take.fail(e)))) + .compile .drain - .toManagedZIO - .fork - } yield ZStream.fromQueue(queue).flattenTake - }.flatten + } + + val fromQueue = ZStream.fromQueue(q).flattenTake + fromQueue.drainFork(toQueue) + } + } + + private def toZioChunk(in: fs2.Chunk[A]): Chunk[A] = + in match { + case fs2.Chunk.ArraySlice(values, _, _) => + Chunk.fromArray(values) + + case singleton: fs2.Chunk.Singleton[a] => + Chunk.single(singleton.value) + + case other => + Chunk.fromIterator(other.iterator) + } }