Skip to content
This repository was archived by the owner on Jan 13, 2025. It is now read-only.

Commit b516749

Browse files
committed
Merge branch 'series/2.x'
2 parents deaaeb6 + 2f25e15 commit b516749

File tree

13 files changed

+633
-465
lines changed

13 files changed

+633
-465
lines changed

.github/workflows/ci.yml

+4-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ name: CI
33
on:
44
pull_request:
55
push:
6-
branches: ['master']
6+
branches: [
7+
'master'
8+
]
79
release:
810
types:
911
- published
@@ -31,7 +33,7 @@ jobs:
3133
fail-fast: false
3234
matrix:
3335
34-
scala: ['2.11.12', '2.12.15', '2.13.7', '3.1.0']
36+
scala: ['2.11.12', '2.12.15', '2.13.8', '3.1.0']
3537
steps:
3638
- name: Checkout current branch
3739
uses: actions/[email protected]

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ project/secrets.tar.xz
1111
project/travis-deploy-key
1212
project/zecret
1313
target
14-
test-output
14+
test-output/
15+
metals.sbt

.scalafmt.conf

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
version = "3.0.4"
2-
runner.dialect = scala213
2+
runner.dialect = scala213source3
33
maxColumn = 120
44
align.preset = most
55
align.multiline = false

README.md

+15-22
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
[![Releases][Badge-SonatypeReleases]][Link-SonatypeReleases]
66
[![Snapshots][Badge-SonatypeSnapshots]][Link-SonatypeSnapshots]
77

8-
This library provides an interoperability layer for reactive streams.
8+
This library provides an interoperability layer between ZIO and reactive streams.
99

1010
## Reactive Streams `Producer` and `Subscriber`
1111

@@ -22,8 +22,6 @@ import org.reactivestreams.example.unicast._
2222
import zio._
2323
import zio.interop.reactivestreams._
2424
import zio.stream._
25-
26-
val runtime = new DefaultRuntime {}
2725
```
2826

2927
We use the following `Publisher` and `Subscriber` for the examples:
@@ -44,37 +42,32 @@ A `Publisher` used as a `Stream` buffers up to `qSize` elements. If possible, `q
4442
a power of two for best performance. The default is 16.
4543

4644
```scala mdoc
47-
val streamFromPublisher = publisher.toStream(qSize = 16)
48-
runtime.unsafeRun(
49-
streamFromPublisher.run(Sink.collectAll[Integer])
50-
)
45+
val streamFromPublisher = publisher.toZIOStream(qSize = 16)
46+
streamFromPublisher.run(Sink.collectAll[Integer])
5147
```
5248

5349
### Subscriber to Sink
5450

5551
When running a `Stream` to a `Subscriber`, a side channel is needed for signalling failures.
56-
For this reason `toSink` returns a tuple of `Promise` and `Sink`. The `Promise` must be failed
57-
on `Stream` failure. The type parameter on `toSink` is the error type of *the Stream*.
52+
For this reason `toZIOSink` returns a tuple of a callback and a `Sink`. The callback must be used to signal `Stream` failure. The type parameter on `toZIOSink` is the error type of *the Stream*.
5853

5954
```scala mdoc
60-
val asSink = subscriber.toSink[Throwable]
61-
val failingStream = Stream.range(3, 13) ++ Stream.fail(new RuntimeException("boom!"))
62-
runtime.unsafeRun(
63-
asSink.flatMap { case (errorP, sink) =>
64-
failingStream.run(sink).catchAll(errorP.fail)
55+
val asSink = subscriber.toZIOSink[Throwable]
56+
val failingStream = ZStream.range(3, 13) ++ ZStream.fail(new RuntimeException("boom!"))
57+
ZIO.scoped {
58+
asSink.flatMap { case (signalError, sink) => // FIXME
59+
failingStream.run(sink).catchAll(signalError)
6560
}
66-
)
61+
}
6762
```
6863

6964
### Stream to Publisher
7065

7166
```scala mdoc
7267
val stream = Stream.range(3, 13)
73-
runtime.unsafeRun(
74-
stream.toPublisher.flatMap { publisher =>
75-
UIO(publisher.subscribe(subscriber))
76-
}
77-
)
68+
stream.toPublisher.flatMap { publisher =>
69+
UIO(publisher.subscribe(subscriber))
70+
}
7871
```
7972

8073
### Sink to Subscriber
@@ -86,11 +79,11 @@ a power of two for best performance. The default is 16.
8679

8780
```scala mdoc
8881
val sink = Sink.collectAll[Integer]
89-
runtime.unsafeRun(
82+
ZIO.scoped {
9083
sink.toSubscriber(qSize = 16).flatMap { case (subscriber, result) =>
9184
UIO(publisher.subscribe(subscriber)) *> result
9285
}
93-
)
86+
}
9487
```
9588

9689
[Badge-CI]: https://github.com/zio/interop-reactive-streams/workflows/CI/badge.svg

build.sbt

+5-3
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ Global / onChangedBuildSource := ReloadOnSourceChanges
3636
addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
3737
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")
3838

39-
val zioVersion = "1.0.15"
40-
val rsVersion = "1.0.3"
41-
val collCompatVersion = "2.5.0"
39+
val zioVersion = "2.0.0"
40+
val rsVersion = "1.0.4"
41+
val collCompatVersion = "2.7.0"
4242

4343
lazy val interopReactiveStreams = project
4444
.in(file("."))
@@ -63,3 +63,5 @@ lazy val interopReactiveStreams = project
6363
Seq("org.scala-lang.modules" %% "scala-collection-compat" % collCompatVersion % Test)
6464
}
6565
)
66+
// .settings(Test / javaOptions += "-XX:ActiveProcessorCount=1") // uncomment to test for deadlocks
67+
.settings(Test / fork := true)

project/BuildHelper.scala

+1-25
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import scalafix.sbt.ScalafixPlugin.autoImport._
1010
object BuildHelper {
1111
val Scala211 = "2.11.12"
1212
val Scala212 = "2.12.15"
13-
val Scala213 = "2.13.7"
13+
val Scala213 = "2.13.8"
1414
val ScalaDotty = "3.1.0"
1515

1616
private val stdOptions = Seq(
@@ -189,30 +189,6 @@ object BuildHelper {
189189
case _ =>
190190
Nil
191191
}
192-
},
193-
Test / unmanagedSourceDirectories ++= {
194-
CrossVersion.partialVersion(scalaVersion.value) match {
195-
case Some((2, x)) if x <= 11 =>
196-
Seq(
197-
Seq(file(sourceDirectory.value.getPath + "/test/scala-2.11")),
198-
CrossType.Full.sharedSrcDir(baseDirectory.value, "test").toList.map(f => file(f.getPath + "-2.x"))
199-
).flatten
200-
case Some((2, x)) if x >= 12 =>
201-
Seq(
202-
Seq(file(sourceDirectory.value.getPath + "/test/scala-2.12")),
203-
Seq(file(sourceDirectory.value.getPath + "/test/scala-2.12+")),
204-
CrossType.Full.sharedSrcDir(baseDirectory.value, "test").toList.map(f => file(f.getPath + "-2.x"))
205-
).flatten
206-
case Some((3, _)) =>
207-
Seq(
208-
Seq(file(sourceDirectory.value.getPath + "/test/scala-2.12+")),
209-
CrossType.Full.sharedSrcDir(baseDirectory.value, "main").toList.map(f => file(f.getPath + "-2.12+")),
210-
CrossType.Full.sharedSrcDir(baseDirectory.value, "test").toList.map(f => file(f.getPath + "-dotty"))
211-
).flatten
212-
case _ =>
213-
Nil
214-
}
215-
216192
}
217193
)
218194

project/build.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version=1.5.5
1+
sbt.version=1.6.2

0 commit comments

Comments
 (0)