Skip to content

Commit

Permalink
fix bug in flatmappar (zio#5325)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamgfraser authored Jul 27, 2021
1 parent 31aa237 commit ec493b2
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2122,6 +2122,13 @@ object ZStreamSpec extends ZIOBaseSpec {
} yield assert(count)(equalTo(2)) && assert(result)(fails(equalTo("Boom")))
} @@ nonFlaky
),
suite("mapMParUnordered")(
testM("mapping with failure is failure") {
val stream =
ZStream.fromIterable(0 to 3).mapMParUnordered(10)(_ => ZIO.fail("fail"))
assertM(stream.runDrain.run)(fails(equalTo("fail")))
} @@ nonFlaky
),
suite("mergeTerminateLeft")(
testM("terminates as soon as the first stream terminates") {
for {
Expand Down
16 changes: 15 additions & 1 deletion streams/shared/src/main/scala/zio/stream/ZStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1431,8 +1431,22 @@ abstract class ZStream[-R, +E, +O](val process: ZManaged[R, Nothing, ZIO[R, Opti
_ <- self.foreachManaged { a =>
for {
latch <- Promise.make[Nothing, Unit]
// If the inner stream completed successfully, release the permit so another
// stream can be executed. Otherwise, signal failure to the outer stream.
withPermitManaged = ZManaged.makeReserve {
permits.withPermitManaged[Any, Nothing].reserve.map {
case Reservation(acquire, release) =>
Reservation(
acquire,
_.fold(
cause => innerFailure.fail(cause.stripFailures),
_ => release(Exit.unit)
)
)
}
}
innerStream = ZStream
.managed(permits.withPermitManaged)
.managed(withPermitManaged)
.tap(_ => latch.succeed(()))
.flatMap(_ => f(a))
.foreachChunk(b => out.offer(UIO.succeedNow(b)).unit)
Expand Down

0 comments on commit ec493b2

Please sign in to comment.