Skip to content

Commit

Permalink
Merge pull request #2895 from yakivy/fix-parjoin-for-mt
Browse files Browse the repository at this point in the history
Fix `Stream#parJoin` for short-circuiting monad transformers
  • Loading branch information
SystemFw authored May 10, 2022
2 parents 9db8a22 + 311f196 commit 89c87cd
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
24 changes: 16 additions & 8 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4048,17 +4048,25 @@ object Stream extends StreamLowPriority {
.interruptWhen(done.map(_.nonEmpty))
.compile
.drain
.guaranteeCase(oc =>
lease.cancel
.flatMap(onOutcome(oc, _)) >> available.release >> decrementRunning
)
.guaranteeCase { oc =>
lease.cancel.rethrow
.guaranteeCase {
case Outcome.Succeeded(fu) =>
onOutcome(oc <* Outcome.succeeded(fu), Either.unit)

case Outcome.Errored(e) =>
onOutcome(oc, Either.left(e))

case _ =>
F.unit
}
.forceR(available.release >> decrementRunning)
}
.handleError(_ => ())
}.void
}
}

val RightUnit = Right(())

def runOuter: F[Unit] =
F.uncancelable { _ =>
outer
Expand All @@ -4071,7 +4079,7 @@ object Stream extends StreamLowPriority {
.interruptWhen(done.map(_.nonEmpty))
.compile
.drain
.guaranteeCase(onOutcome(_, RightUnit) >> decrementRunning)
.guaranteeCase(onOutcome(_, Either.unit) >> decrementRunning)
.handleError(_ => ())
}

Expand Down Expand Up @@ -4107,7 +4115,7 @@ object Stream extends StreamLowPriority {
signalResult(fiber)
}
.flatMap { _ =>
output.stream.flatMap(Stream.chunk(_).covary[F])
output.stream.flatMap(Stream.chunk)
}
}

Expand Down
24 changes: 24 additions & 0 deletions core/shared/src/test/scala/fs2/StreamParJoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,30 @@ class StreamParJoinSuite extends Fs2Suite {
}
}

test(
"do not block while evaluating a stream in EitherT[IO, Throwable, *] with multiple parJoins"
) {
case object TestException extends Throwable with NoStackTrace
type F[A] = EitherT[IO, Throwable, A]

Stream
.range(1, 64)
.covary[F]
.map(i =>
Stream.eval[F, Unit](
if (i == 7) EitherT.leftT(TestException)
else EitherT.right(IO.unit)
)
)
.parJoin(4)
.map(_ => Stream.eval[F, Unit](EitherT.right(IO.unit)))
.parJoin(4)
.compile
.drain
.value
.assertEquals(Left(TestException))
}

test("do not block while evaluating an EitherT.left outer stream") {
case object TestException extends Throwable with NoStackTrace

Expand Down

0 comments on commit 89c87cd

Please sign in to comment.