Skip to content

Commit 245ec49

Browse files
bmaslakovstepango
authored andcommitted
Add extension fuctions to concat Iterables (#142)
* Add extension fuctions to concat Iterable<Single>, Iterable<Maybe> and Iterable<Completable> * Add extension fuctions to concat Iterable<Flowable>, update descriptions * Rename concat* extension functions to concatAll * Add Iterable<Observable>.concatAll(), rename tests
1 parent 49b9d36 commit 245ec49

File tree

10 files changed

+83
-2
lines changed

10 files changed

+83
-2
lines changed

src/main/kotlin/io/reactivex/rxkotlin/completable.kt

+6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.reactivex.rxkotlin
22

33
import io.reactivex.Completable
4+
import io.reactivex.CompletableSource
45
import io.reactivex.Flowable
56
import io.reactivex.Observable
67
import io.reactivex.functions.Action
@@ -24,3 +25,8 @@ fun Observable<Completable>.mergeAllCompletables() = flatMapCompletable { it }
2425
* Merges the emissions of a Flowable<Completable>. Same as calling `flatMap { it }`.
2526
*/
2627
fun Flowable<Completable>.mergeAllCompletables() = flatMapCompletable { it }
28+
29+
/**
30+
* Concats an Iterable of completables into flowable. Same as calling `Completable.concat(this)`
31+
*/
32+
fun Iterable<CompletableSource>.concatAll() = Completable.concat(this)

src/main/kotlin/io/reactivex/rxkotlin/flowable.kt

+7-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package io.reactivex.rxkotlin
33
import io.reactivex.Flowable
44
import io.reactivex.functions.BiFunction
55
import io.reactivex.functions.Function3
6+
import org.reactivestreams.Publisher
67

78

89
fun BooleanArray.toFlowable(): Flowable<Boolean> = asIterable().toFlowable()
@@ -108,4 +109,9 @@ fun <A: Any, B: Any> Flowable<Pair<A, B>>.toMap() = toMap({it.first},{it.second}
108109
/**
109110
* Collects `Pair` emission into a multimap
110111
*/
111-
fun <A: Any, B: Any> Flowable<Pair<A, B>>.toMultimap() = toMultimap({it.first},{it.second})
112+
fun <A: Any, B: Any> Flowable<Pair<A, B>>.toMultimap() = toMultimap({it.first},{it.second})
113+
114+
/**
115+
* Concats an Iterable of flowables into flowable. Same as calling `Flowable.concat(this)`
116+
*/
117+
fun <T : Any> Iterable<Publisher<T>>.concatAll() = Flowable.concat(this)

src/main/kotlin/io/reactivex/rxkotlin/maybe.kt

+6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package io.reactivex.rxkotlin
22

33
import io.reactivex.Flowable
44
import io.reactivex.Maybe
5+
import io.reactivex.MaybeSource
56
import io.reactivex.Observable
67
import java.util.concurrent.Callable
78
import java.util.concurrent.Future
@@ -27,3 +28,8 @@ fun <T : Any> Observable<Maybe<T>>.mergeAllMaybes() = flatMapMaybe { it }
2728
* Merges the emissions of a Flowable<Maybe<T>>. Same as calling `flatMap { it }`.
2829
*/
2930
fun <T : Any> Flowable<Maybe<T>>.mergeAllMaybes() = flatMapMaybe { it }
31+
32+
/**
33+
* Concats an Iterable of maybes into flowable. Same as calling `Maybe.concat(this)`
34+
*/
35+
fun <T : Any> Iterable<MaybeSource<T>>.concatAll() = Maybe.concat(this)

src/main/kotlin/io/reactivex/rxkotlin/observable.kt

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.reactivex.rxkotlin
22

33
import io.reactivex.Observable
4+
import io.reactivex.ObservableSource
45
import io.reactivex.functions.BiFunction
56
import io.reactivex.functions.Function3
67

@@ -101,4 +102,6 @@ fun <A: Any, B: Any> Observable<Pair<A,B>>.toMap() = toMap({it.first},{it.second
101102
/**
102103
* Collects `Pair` emission into a multimap
103104
*/
104-
fun <A: Any, B: Any> Observable<Pair<A,B>>.toMultimap() = toMultimap({it.first},{it.second})
105+
fun <A: Any, B: Any> Observable<Pair<A,B>>.toMultimap() = toMultimap({it.first},{it.second})
106+
107+
fun <T : Any> Iterable<ObservableSource<T>>.concatAll() = Observable.concat(this)

src/main/kotlin/io/reactivex/rxkotlin/single.kt

+6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package io.reactivex.rxkotlin
33
import io.reactivex.Flowable
44
import io.reactivex.Observable
55
import io.reactivex.Single
6+
import io.reactivex.SingleSource
67
import java.util.concurrent.Callable
78
import java.util.concurrent.Future
89

@@ -25,3 +26,8 @@ fun <T : Any> Observable<Single<T>>.mergeAllSingles() = flatMapSingle { it }
2526
* Merges the emissions of a Flowable<Single<T>>. Same as calling `flatMap { it }`.
2627
*/
2728
fun <T : Any> Flowable<Single<T>>.mergeAllSingles() = flatMapSingle { it }
29+
30+
/**
31+
* Concats an Iterable of singles into flowable. Same as calling `Single.concat(this)`
32+
*/
33+
fun <T : Any> Iterable<SingleSource<T>>.concatAll() = Single.concat(this)

src/test/kotlin/io/reactivex/rxkotlin/CompletableTest.kt

+12
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package io.reactivex.rxkotlin
22

3+
import io.reactivex.Completable
34
import io.reactivex.Single
45
import io.reactivex.functions.Action
6+
import org.junit.Assert
57
import org.junit.Assert.assertEquals
68
import org.junit.Assert.assertNotNull
79
import org.junit.Test
@@ -40,4 +42,14 @@ class CompletableTest {
4042
c1.toObservable<String>().blockingFirst()
4143
}
4244

45+
@Test fun testConcatAll() {
46+
var list = emptyList<Int>()
47+
(0 until 10)
48+
.map { v -> Completable.create { list += v } }
49+
.concatAll()
50+
.subscribe {
51+
Assert.assertEquals((0 until 10).toList(), list)
52+
}
53+
}
54+
4355
}

src/test/kotlin/io/reactivex/rxkotlin/FlowableTest.kt

+10
Original file line numberDiff line numberDiff line change
@@ -199,4 +199,14 @@ class FlowableTest {
199199

200200
testSubscriber.assertValues(Triple("Alpha",1, 100), Triple("Beta",2, 200), Triple("Gamma",3, 300))
201201
}
202+
203+
@Test fun testConcatAll() {
204+
(0 until 10)
205+
.map { Flowable.just(it) }
206+
.concatAll()
207+
.toList()
208+
.subscribe { result ->
209+
Assert.assertEquals((0 until 10).toList(), result)
210+
}
211+
}
202212
}

src/test/kotlin/io/reactivex/rxkotlin/MaybeTest.kt

+10
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,14 @@ class MaybeTest {
1616
}
1717
Assert.assertTrue(first.get() == "Alpha")
1818
}
19+
20+
@Test fun testConcatAll() {
21+
(0 until 10)
22+
.map { Maybe.just(it) }
23+
.concatAll()
24+
.toList()
25+
.subscribe { result ->
26+
Assert.assertEquals((0 until 10).toList(), result)
27+
}
28+
}
1929
}

src/test/kotlin/io/reactivex/rxkotlin/ObservableTest.kt

+11
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package io.reactivex.rxkotlin
22

33
import io.reactivex.Observable
44
import io.reactivex.observers.TestObserver
5+
import org.junit.Assert
56
import org.junit.Assert.*
67
import org.junit.Ignore
78
import org.junit.Test
@@ -223,4 +224,14 @@ class ObservableTest {
223224
testObserver.assertValues(Triple("Alpha",1, 100), Triple("Beta",2, 200), Triple("Gamma",3, 300))
224225
}
225226

227+
@Test fun testConcatAll() {
228+
var counter = 0
229+
(0 until 10)
230+
.map { Observable.just(counter++, counter++, counter++) }
231+
.concatAll()
232+
.toList()
233+
.subscribe { result ->
234+
Assert.assertEquals((0 until 30).toList(), result)
235+
}
236+
}
226237
}

src/test/kotlin/io/reactivex/rxkotlin/SingleTest.kt

+11
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package io.reactivex.rxkotlin
22

33
import io.reactivex.Observable
44
import io.reactivex.Single
5+
import org.junit.Assert
56
import org.junit.Test
67
import org.mockito.Mockito
78
import org.mockito.Mockito.mock
@@ -56,4 +57,14 @@ class SingleTest : KotlinTests() {
5657
verify(a, Mockito.times(1))
5758
.received("Alpha")
5859
}
60+
61+
@Test fun testConcatAll() {
62+
(0 until 10)
63+
.map { Single.just(it) }
64+
.concatAll()
65+
.toList()
66+
.subscribe { result ->
67+
Assert.assertEquals((0 until 10).toList(), result)
68+
}
69+
}
5970
}

0 commit comments

Comments
 (0)