Skip to content

Commit

Permalink
Channel - merge
Browse files Browse the repository at this point in the history
  • Loading branch information
SystemFw committed Mar 7, 2021
1 parent 1f87736 commit ef57231
Showing 1 changed file with 4 additions and 7 deletions.
11 changes: 4 additions & 7 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1859,22 +1859,19 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
// then close te queue (by putting a None in it)
val doneAndClose: F2[Unit] = otherSideDone.getAndSet(true).flatMap {
// complete only if other side is done too.
case true => resultQ.offer(None)
case true => resultChan.close.void
case false => F.unit
}

// stream that is generated from pumping out the elements of the queue.
val pumpFromQueue: Stream[F2, O2] = Stream.fromQueueNoneTerminated(resultQ).flatten

// action to interrupt the processing of both streams by completing interrupt
// We need to use `attempt` because `interruption` may already be completed.
val signalInterruption: F2[Unit] = interrupt.complete(()).void

def go(s: Stream[F2, O2], guard: Semaphore[F2]): Pull[F2, O2, Unit] =
Pull.eval(guard.acquire) >> s.pull.uncons.flatMap {
case Some((hd, tl)) =>
val enq = resultQ.offer(Some(Stream.chunk(hd).onFinalize(guard.release)))
Pull.eval(enq) >> go(tl, guard)
val send = resultChan.send(Stream.chunk(hd).onFinalize(guard.release))
Pull.eval(send) >> go(tl, guard)
case None => Pull.done
}

Expand All @@ -1899,7 +1896,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,

val runStreams = runStream(this, resultL).start >> runStream(that, resultR).start

Stream.bracket(runStreams)(_ => atRunEnd) >> watchInterrupted(pumpFromQueue)
Stream.bracket(runStreams)(_ => atRunEnd) >> watchInterrupted(resultChan.stream.flatten)
}
Stream.eval(fstream).flatten
}
Expand Down

0 comments on commit ef57231

Please sign in to comment.