Skip to content

Commit 72dcb2a

Browse files
Updated Kotlin and the libs for coroutines (#200)
* Migrated Project to Kotlin 1.5.30 * Updated scarlet-stream-adapter-coroutines to be compatible with 1.5.30 as well regarding use of asFlow and channels * Updated coroutines to 1.5.2 * scarlet-stream-adapter-coroutines produces hot flow and hot receive channel Fixes #169 Fixes #163 Fixes #27 * ChannelForwarder now calls dispose correctly on Completed and Error * Using new trySendBlocking in onNext callback * Updating Tests to include FlowStreamAdapterTest * Reving kotlin coroutines 1.6.0 * Updating circleci configuration to stash the correct test reports * Updating to coroutines 1.4 and higher removes the reactive streams behavior of the past Relied on a subscription and a buffered observer, adding buffering * Documentation * No need to rename files * Use supervisor job instead of globalscope * Proper variable naming * Channel Forwarder should not throw
1 parent 6d98e7d commit 72dcb2a

File tree

17 files changed

+400
-191
lines changed

17 files changed

+400
-191
lines changed

.circleci/config.yml

+7-2
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,14 @@ jobs:
1515
name: Disable PreDexing
1616
command: echo "disablePreDex" >> gradle.properties
1717
- run: if [ -e ./gradlew ]; then ./gradlew dependencies;else gradle dependencies;fi
18+
- run: mkdir -p test-reports/junit/
1819
- run: ./gradlew test
19-
- run: mkdir -p $CIRCLE_TEST_REPORTS/junit/
20-
- run: find . -type f -regex ".*/build/test-results/.*xml" -exec cp {} $CIRCLE_TEST_REPORTS/junit/ \;
20+
- run:
21+
name: gather_test_results
22+
command: find . -type f -regex ".*/build/test-results/.*xml" -exec cp {} test-reports/junit/ \;
23+
when: always
24+
- store_test_results:
25+
path: test-reports
2126
- run:
2227
name: Deploy Snapshot
2328
command: ./publish.sh

build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Top-level build file where you can add configuration options common to all sub-projects/modules.
22

33
buildscript {
4-
ext.kotlin_version = '1.4.31'
4+
ext.kotlin_version = '1.5.30'
55
ext.dokka_version = '0.9.16'
66
repositories {
77
mavenCentral()
@@ -10,7 +10,7 @@ buildscript {
1010
}
1111
dependencies {
1212
classpath 'com.vanniktech:gradle-maven-publish-plugin:0.8.0'
13-
classpath 'com.android.tools.build:gradle:4.1.2'
13+
classpath 'com.android.tools.build:gradle:4.1.3'
1414
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
1515
}
1616
}

demo/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ dependencies {
3333
implementation project(':scarlet-message-adapter-moshi')
3434
implementation project(':scarlet-message-adapter-protobuf')
3535
implementation project(':scarlet-stream-adapter-rxjava2')
36+
implementation project(':scarlet-stream-adapter-coroutines')
3637

3738
implementation libs.appCompat
3839
implementation libs.material

demo/src/main/java/com/tinder/app/echo/api/EchoService.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,20 @@ import com.tinder.scarlet.Event
99
import com.tinder.scarlet.State
1010
import com.tinder.scarlet.ws.Receive
1111
import com.tinder.scarlet.ws.Send
12-
import io.reactivex.Flowable
12+
import kotlinx.coroutines.flow.Flow
1313

1414
interface EchoService {
1515
@Receive
16-
fun observeState(): Flowable<State>
16+
fun observeState(): Flow<State>
1717

1818
@Receive
19-
fun observeEvent(): Flowable<Event>
19+
fun observeEvent(): Flow<Event>
2020

2121
@Receive
22-
fun observeText(): Flowable<String>
22+
fun observeText(): Flow<String>
2323

2424
@Receive
25-
fun observeBitmap(): Flowable<Bitmap>
25+
fun observeBitmap(): Flow<Bitmap>
2626

2727
@Send
2828
fun sendText(message: String): Boolean

demo/src/main/java/com/tinder/app/echo/domain/ChatMessageRepository.kt

+17-15
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ import com.tinder.scarlet.State
1313
import com.tinder.scarlet.WebSocket
1414
import io.reactivex.Flowable
1515
import io.reactivex.processors.BehaviorProcessor
16-
import io.reactivex.schedulers.Schedulers
16+
import kotlinx.coroutines.CoroutineScope
17+
import kotlinx.coroutines.flow.catch
18+
import kotlinx.coroutines.flow.launchIn
19+
import kotlinx.coroutines.flow.onEach
1720
import org.joda.time.DateTime
1821
import timber.log.Timber
1922
import java.util.concurrent.atomic.AtomicInteger
@@ -22,16 +25,16 @@ import javax.inject.Inject
2225

2326
@EchoBotScope
2427
class ChatMessageRepository @Inject constructor(
25-
private val echoService: EchoService
28+
private val echoService: EchoService,
29+
private val coroutineScope: CoroutineScope
2630
) {
2731
private val messageCount = AtomicInteger()
2832
private val messagesRef = AtomicReference<List<ChatMessage>>()
2933
private val messagesProcessor = BehaviorProcessor.create<List<ChatMessage>>()
3034

3135
init {
3236
echoService.observeEvent()
33-
.observeOn(Schedulers.io())
34-
.subscribe({ event ->
37+
.onEach { event ->
3538
val description = when (event) {
3639
is Event.OnLifecycle.StateChange<*> -> when (event.state) {
3740
Lifecycle.State.Started -> "\uD83C\uDF1D On Lifecycle Start"
@@ -57,39 +60,38 @@ class ChatMessageRepository @Inject constructor(
5760
}
5861
Event.OnRetry -> "⏰ On Retry"
5962
}
60-
val chatMessage = ChatMessage.Text(generateMessageId(), description, ChatMessage.Source.RECEIVED)
63+
val chatMessage =
64+
ChatMessage.Text(generateMessageId(), description, ChatMessage.Source.RECEIVED)
6165
addChatMessage(chatMessage)
62-
}, { e ->
66+
}.catch { e ->
6367
Timber.e(e)
64-
})
68+
}.launchIn(coroutineScope)
6569

6670
echoService.observeText()
67-
.observeOn(Schedulers.io())
68-
.subscribe({ text ->
71+
.onEach { text ->
6972
val chatMessage = ChatMessage.Text(
7073
generateMessageId(),
7174
text,
7275
ChatMessage.Source.RECEIVED,
7376
DateTime.now().plusMillis(50)
7477
)
7578
addChatMessage(chatMessage)
76-
}, { e ->
79+
}.catch { e ->
7780
Timber.e(e)
78-
})
81+
}.launchIn(coroutineScope)
7982

8083
echoService.observeBitmap()
81-
.observeOn(Schedulers.io())
82-
.subscribe({ bitmap ->
84+
.onEach { bitmap ->
8385
val chatMessage = ChatMessage.Image(
8486
generateMessageId(),
8587
bitmap,
8688
ChatMessage.Source.RECEIVED,
8789
DateTime.now().plusMillis(50)
8890
)
8991
addChatMessage(chatMessage)
90-
}, { e ->
92+
}.catch { e ->
9193
Timber.e(e)
92-
})
94+
}.launchIn(coroutineScope)
9395
}
9496

9597
fun observeChatMessage(): Flowable<List<ChatMessage>> = messagesProcessor

demo/src/main/java/com/tinder/app/echo/inject/EchoBotComponent.kt

+10
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@ import com.tinder.scarlet.Scarlet
1414
import com.tinder.scarlet.lifecycle.android.AndroidLifecycle
1515
import com.tinder.scarlet.streamadapter.rxjava2.RxJava2StreamAdapterFactory
1616
import com.tinder.scarlet.websocket.okhttp.newWebSocketFactory
17+
import com.tinder.streamadapter.coroutines.CoroutinesStreamAdapterFactory
1718
import dagger.Component
1819
import dagger.Module
1920
import dagger.Provides
21+
import kotlinx.coroutines.CoroutineScope
22+
import kotlinx.coroutines.SupervisorJob
2023
import okhttp3.OkHttpClient
2124
import okhttp3.logging.HttpLoggingInterceptor
2225

@@ -59,9 +62,16 @@ interface EchoBotComponent {
5962
.lifecycle(lifecycle)
6063
.addMessageAdapterFactory(BitmapMessageAdapter.Factory())
6164
.addStreamAdapterFactory(RxJava2StreamAdapterFactory())
65+
.addStreamAdapterFactory(CoroutinesStreamAdapterFactory())
6266
.build()
6367
return scarlet.create()
6468
}
69+
70+
@Provides
71+
@EchoBotScope
72+
fun provideCoroutineScopeToRunIn(): CoroutineScope {
73+
return CoroutineScope(SupervisorJob())
74+
}
6575
}
6676

6777
interface ComponentProvider {

demo/src/main/java/com/tinder/app/gdax/view/GdaxFragment.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class GdaxFragment : Fragment(), GdaxTarget {
101101
setDrawValues(false)
102102
}
103103

104-
val minPrice = priceEntries.minBy { it.y }?.y ?: 0F
104+
val minPrice = priceEntries.minByOrNull { it.y }?.y ?: 0F
105105
val minPriceEntries = listOf(
106106
Entry(minutesAgo.millisOfDay.toFloat(), minPrice),
107107
Entry(now.millisOfDay.toFloat(), minPrice)

dependencies.gradle

+5-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ ext.versions = [
1313
rxAndroid: '2.0.2',
1414
rxKotlin: '2.2.0',
1515
rxJava1: '1.3.4',
16-
kotlinxCoroutines: '1.4.3',
16+
kotlinxCoroutines: '1.6.0',
1717

1818
stetho: '1.5.0',
1919
stethoOkHttp: '1.5.0',
@@ -42,6 +42,7 @@ ext.versions = [
4242

4343
junit: '4.12',
4444
assertJ: '3.8.0',
45+
truth: '1.1.3',
4546

4647
stateMachine: "0.2.0"
4748
]
@@ -61,7 +62,8 @@ ext.libs = [
6162
kotlinx: [
6263
coroutines: [
6364
core: "org.jetbrains.kotlinx:kotlinx-coroutines-core:$versions.kotlinxCoroutines",
64-
reactive: "org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$versions.kotlinxCoroutines"
65+
reactive: "org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$versions.kotlinxCoroutines",
66+
test: "org.jetbrains.kotlinx:kotlinx-coroutines-test:$versions.kotlinxCoroutines"
6567
]
6668
],
6769

@@ -100,6 +102,7 @@ ext.libs = [
100102
junit: "junit:junit:$versions.junit",
101103
mockito: "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0",
102104
assertJ: "org.assertj:assertj-core:$versions.assertJ",
105+
truth: "com.google.truth:truth:$versions.truth",
103106

104107
kotlin: [
105108
stdlib: "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version",

scarlet-stream-adapter-coroutines/build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@ dependencies {
1515
testImplementation libs.junit
1616
testImplementation libs.mockito
1717
testImplementation libs.kotlin.reflect
18-
testImplementation libs.assertJ
18+
testImplementation libs.kotlinx.coroutines.test
19+
testImplementation libs.truth
1920
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.tinder.streamadapter.coroutines
2+
3+
import com.tinder.scarlet.Stream
4+
import kotlinx.coroutines.channels.BufferOverflow
5+
import kotlinx.coroutines.channels.Channel
6+
import kotlinx.coroutines.channels.ReceiveChannel
7+
import kotlinx.coroutines.channels.onClosed
8+
import kotlinx.coroutines.channels.onFailure
9+
import kotlinx.coroutines.channels.trySendBlocking
10+
11+
internal class ChannelForwarder<T>(bufferSize: Int) : Stream.Observer<T> {
12+
private val channel = Channel<T>(bufferSize, BufferOverflow.DROP_OLDEST)
13+
private var disposable: Stream.Disposable? = null
14+
15+
fun start(stream: Stream<T>): ReceiveChannel<T> {
16+
disposable = stream.start(this)
17+
return channel
18+
}
19+
20+
override fun onComplete() {
21+
channel.close()
22+
disposable?.dispose()
23+
}
24+
25+
override fun onError(throwable: Throwable) {
26+
channel.close(throwable)
27+
disposable?.dispose()
28+
}
29+
30+
override fun onNext(data: T) {
31+
channel.trySendBlocking(data).onClosed {
32+
// ignore
33+
}.onFailure {
34+
// ignore
35+
}
36+
}
37+
}

scarlet-stream-adapter-coroutines/src/main/java/com/tinder/streamadapter/coroutines/CoroutinesStreamAdapterFactory.kt

+12-4
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,25 @@ package com.tinder.streamadapter.coroutines
77
import com.tinder.scarlet.StreamAdapter
88
import com.tinder.scarlet.utils.getRawType
99
import kotlinx.coroutines.channels.ReceiveChannel
10+
import kotlinx.coroutines.flow.Flow
1011
import java.lang.reflect.Type
1112

1213
/**
13-
* A [stream adapter factory][StreamAdapter.Factory] that uses ReceiveChannel.
14+
* A [stream adapter factory][StreamAdapter.Factory] that allows for [ReceiveChannel]
15+
* and [Flow] based streams.
16+
* [bufferSize] is configurable for the underlying channels, defaults to [DEFAULT_BUFFER]
1417
*/
15-
class CoroutinesStreamAdapterFactory : StreamAdapter.Factory {
18+
private const val DEFAULT_BUFFER = 128
19+
20+
class CoroutinesStreamAdapterFactory(
21+
private val bufferSize: Int = DEFAULT_BUFFER
22+
) : StreamAdapter.Factory {
1623

1724
override fun create(type: Type): StreamAdapter<Any, Any> {
1825
return when (type.getRawType()) {
19-
ReceiveChannel::class.java -> ReceiveChannelStreamAdapter()
26+
Flow::class.java -> FlowStreamAdapter(bufferSize)
27+
ReceiveChannel::class.java -> ReceiveChannelStreamAdapter(bufferSize)
2028
else -> throw IllegalArgumentException()
2129
}
2230
}
23-
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* © 2013 - 2018 Tinder, Inc., ALL RIGHTS RESERVED
3+
*/
4+
5+
package com.tinder.streamadapter.coroutines
6+
7+
import com.tinder.scarlet.Stream
8+
import com.tinder.scarlet.StreamAdapter
9+
import kotlinx.coroutines.flow.Flow
10+
import kotlinx.coroutines.flow.buffer
11+
import kotlinx.coroutines.flow.receiveAsFlow
12+
13+
class FlowStreamAdapter<T>(private val buffer: Int) : StreamAdapter<T, Flow<T>> {
14+
15+
override fun adapt(stream: Stream<T>): Flow<T> {
16+
return ChannelForwarder<T>(buffer).start(stream).receiveAsFlow()
17+
}
18+
}
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
1-
/*
2-
* © 2013 - 2018 Tinder, Inc., ALL RIGHTS RESERVED
3-
*/
4-
51
package com.tinder.streamadapter.coroutines
62

73
import com.tinder.scarlet.Stream
84
import com.tinder.scarlet.StreamAdapter
95
import kotlinx.coroutines.channels.ReceiveChannel
10-
import kotlinx.coroutines.reactive.openSubscription
116

12-
class ReceiveChannelStreamAdapter<T> : StreamAdapter<T, ReceiveChannel<T>> {
7+
class ReceiveChannelStreamAdapter<T>(private val buffer: Int) : StreamAdapter<T, ReceiveChannel<T>> {
138

14-
override fun adapt(stream: Stream<T>) = stream.openSubscription()
15-
}
9+
override fun adapt(stream: Stream<T>): ReceiveChannel<T> {
10+
val channelForwarder = ChannelForwarder<T>(buffer)
11+
return channelForwarder.start(stream)
12+
}
13+
}

0 commit comments

Comments
 (0)