Skip to content

Commit 2a6561c

Browse files
authored
rxKotlin for rxJava2 (#87)
* Dependencies updated * Initial implementation of rxKotlin for rxJava2 * Code cleaning. Code review related fixes. * Whitespaces removed. * gitignore update * subscribers replaced by named args extension * subscribeBy non-null params * empty* methods removed flowable extensions added minor fixes * dependencies updated * subscribeBy method for Flowable * tests updated * minor tests refactoring
1 parent 2a00c0e commit 2a6561c

30 files changed

+910
-863
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,4 @@ bin/
6969
# Scala build
7070
*.cache
7171
/.nb-gradle/private/
72+
local.properties

RELEASING.md

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Release Process
2+
===============
3+
4+
1. Ensure `VERSION_NAME` in `gradle.properties` is set to the version you want to release.
5+
2. Add an entry in `CHANGELOG.md` with the changes for the release.
6+
3. Update `README.md` with the version about to be released. Also update the RxJava version in
7+
this file to its latest.
8+
4. Update the RxJava version in `rxkotlin/build.gradle` to its latest. (We tell people that we
9+
won't be tracking RxJava releases, and we don't, but we do it anyway when we are releasing for
10+
those who ignore the advice.)
11+
5. Commit: `git commit -am "Prepare version X.Y.X"`
12+
6. Tag: `git tag -a X.Y.Z -m "Version X.Y.Z"`
13+
7. Update `VERSION_NAME` in `gradle.properties` to the next development version. For example, if
14+
you just tagged version 1.0.4 you would set this value to 1.0.5. Do NOT append "-SNAPSHOT" to
15+
this value, it will be added automatically.
16+
8. Commit: `git commit -am "Prepare next development version."`
17+
9. Push: `git push && git push --tags`
18+
10. Paste the `CHANGELOG.md` contents for this version into a Release on GitHub along with the
19+
Groovy for depending on the new version (https://github.com/ReactiveX/RxKotlin/releases).

build.gradle

+11-9
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
11
buildscript {
2-
ext.kotlin_version = '1.0.2'
3-
repositories { jcenter() }
4-
dependencies { classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:2.+',
5-
"org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" }
2+
ext.kotlin_version = '1.0.6'
3+
repositories { jcenter() }
4+
dependencies {
5+
classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:4.+',
6+
"org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
7+
}
68
}
79

8-
apply plugin: 'rxjava-project'
10+
apply plugin: 'nebula.rxjava-project'
911
apply plugin: 'kotlin'
1012

1113
dependencies {
12-
compile 'io.reactivex:rxjava:1.1.5'
14+
compile 'io.reactivex.rxjava2:rxjava:2.0.5'
1315
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
14-
testCompile 'org.funktionale:funktionale:0.8'
16+
testCompile 'org.funktionale:funktionale:0.9'
1517
testCompile 'junit:junit:4.12'
16-
testCompile 'org.mockito:mockito-core:1.8.5'
18+
testCompile 'org.mockito:mockito-core:1.10.19'
1719
examplesCompile 'com.squareup.retrofit:retrofit:1.9.+'
1820
}
1921

2022
task wrapper(type: Wrapper) {
21-
gradleVersion = '2.2.1'
23+
gradleVersion = '2.14'
2224
}
2325

2426
// support for snapshot/final releases with the various branches RxJava uses

gradle/wrapper/gradle-wrapper.jar

3.12 KB
Binary file not shown.
+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
#Sat Mar 07 02:11:57 MSK 2015
1+
#Sun Feb 12 16:33:43 SGT 2017
22
distributionBase=GRADLE_USER_HOME
33
distributionPath=wrapper/dists
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists
6-
distributionUrl=https\://services.gradle.org/distributions/gradle-2.2.1-all.zip
6+
distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-bin.zip

gradlew

+41-33
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,35 @@
1-
#!/usr/bin/env bash
1+
#!/usr/bin/env sh
22

33
##############################################################################
44
##
55
## Gradle start up script for UN*X
66
##
77
##############################################################################
88

9-
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
10-
DEFAULT_JVM_OPTS=""
9+
# Attempt to set APP_HOME
10+
# Resolve links: $0 may be a link
11+
PRG="$0"
12+
# Need this for relative symlinks.
13+
while [ -h "$PRG" ] ; do
14+
ls=`ls -ld "$PRG"`
15+
link=`expr "$ls" : '.*-> \(.*\)$'`
16+
if expr "$link" : '/.*' > /dev/null; then
17+
PRG="$link"
18+
else
19+
PRG=`dirname "$PRG"`"/$link"
20+
fi
21+
done
22+
SAVED="`pwd`"
23+
cd "`dirname \"$PRG\"`/" >/dev/null
24+
APP_HOME="`pwd -P`"
25+
cd "$SAVED" >/dev/null
1126

1227
APP_NAME="Gradle"
1328
APP_BASE_NAME=`basename "$0"`
1429

30+
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
31+
DEFAULT_JVM_OPTS=""
32+
1533
# Use the maximum available, or set MAX_FD != -1 to use that value.
1634
MAX_FD="maximum"
1735

@@ -30,6 +48,7 @@ die ( ) {
3048
cygwin=false
3149
msys=false
3250
darwin=false
51+
nonstop=false
3352
case "`uname`" in
3453
CYGWIN* )
3554
cygwin=true
@@ -40,31 +59,11 @@ case "`uname`" in
4059
MINGW* )
4160
msys=true
4261
;;
62+
NONSTOP* )
63+
nonstop=true
64+
;;
4365
esac
4466

45-
# For Cygwin, ensure paths are in UNIX format before anything is touched.
46-
if $cygwin ; then
47-
[ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
48-
fi
49-
50-
# Attempt to set APP_HOME
51-
# Resolve links: $0 may be a link
52-
PRG="$0"
53-
# Need this for relative symlinks.
54-
while [ -h "$PRG" ] ; do
55-
ls=`ls -ld "$PRG"`
56-
link=`expr "$ls" : '.*-> \(.*\)$'`
57-
if expr "$link" : '/.*' > /dev/null; then
58-
PRG="$link"
59-
else
60-
PRG=`dirname "$PRG"`"/$link"
61-
fi
62-
done
63-
SAVED="`pwd`"
64-
cd "`dirname \"$PRG\"`/" >&-
65-
APP_HOME="`pwd -P`"
66-
cd "$SAVED" >&-
67-
6867
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
6968

7069
# Determine the Java command to use to start the JVM.
@@ -90,7 +89,7 @@ location of your Java installation."
9089
fi
9190

9291
# Increase the maximum file descriptors if we can.
93-
if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
92+
if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
9493
MAX_FD_LIMIT=`ulimit -H -n`
9594
if [ $? -eq 0 ] ; then
9695
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
@@ -114,6 +113,7 @@ fi
114113
if $cygwin ; then
115114
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
116115
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
116+
JAVACMD=`cygpath --unix "$JAVACMD"`
117117

118118
# We build the pattern for arguments to be converted via cygpath
119119
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
@@ -154,11 +154,19 @@ if $cygwin ; then
154154
esac
155155
fi
156156

157-
# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
158-
function splitJvmOpts() {
159-
JVM_OPTS=("$@")
157+
# Escape application args
158+
save ( ) {
159+
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
160+
echo " "
160161
}
161-
eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
162-
JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
162+
APP_ARGS=$(save "$@")
163+
164+
# Collect all arguments for the java command, following the shell quoting and substitution rules
165+
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
166+
167+
# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
168+
if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
169+
cd "$(dirname "$0")"
170+
fi
163171

164-
exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"
172+
exec "$JAVACMD" "$@"

gradlew.bat

+4-10
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@
88
@rem Set local scope for the variables with windows NT shell
99
if "%OS%"=="Windows_NT" setlocal
1010

11-
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
12-
set DEFAULT_JVM_OPTS=
13-
1411
set DIRNAME=%~dp0
1512
if "%DIRNAME%" == "" set DIRNAME=.
1613
set APP_BASE_NAME=%~n0
1714
set APP_HOME=%DIRNAME%
1815

16+
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
17+
set DEFAULT_JVM_OPTS=
18+
1919
@rem Find java.exe
2020
if defined JAVA_HOME goto findJavaFromJavaHome
2121

@@ -46,10 +46,9 @@ echo location of your Java installation.
4646
goto fail
4747

4848
:init
49-
@rem Get command-line arguments, handling Windowz variants
49+
@rem Get command-line arguments, handling Windows variants
5050

5151
if not "%OS%" == "Windows_NT" goto win9xME_args
52-
if "%@eval[2+2]" == "4" goto 4NT_args
5352

5453
:win9xME_args
5554
@rem Slurp the command line arguments.
@@ -60,11 +59,6 @@ set _SKIP=2
6059
if "x%~1" == "x" goto execute
6160

6261
set CMD_LINE_ARGS=%*
63-
goto execute
64-
65-
:4NT_args
66-
@rem Get arguments from the 4NT Shell from JP Software
67-
set CMD_LINE_ARGS=%$
6862

6963
:execute
7064
@rem Setup the command line
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,28 @@
11
package rx.lang.kotlin.examples
22

3-
import rx.Observable
4-
import rx.lang.kotlin.*
5-
import rx.subscriptions.CompositeSubscription
3+
import io.reactivex.Observable
4+
import io.reactivex.disposables.CompositeDisposable
5+
import rx.lang.kotlin.addTo
6+
import rx.lang.kotlin.combineLatest
7+
import rx.lang.kotlin.observable
8+
import rx.lang.kotlin.plusAssign
9+
import rx.lang.kotlin.subscribeBy
10+
import rx.lang.kotlin.toObservable
11+
import rx.lang.kotlin.zip
612
import java.net.URL
7-
import java.util.*
13+
import java.util.Scanner
814
import java.util.concurrent.TimeUnit
915
import kotlin.concurrent.thread
1016

1117
fun main(args: Array<String>) {
1218

13-
val subscription = CompositeSubscription()
19+
val subscription = CompositeDisposable()
1420

1521
val printArticle = { art: String ->
1622
println("--- Article ---\n${art.substring(0, 125)}")
1723
}
1824

25+
@Suppress("ConvertLambdaToReference")
1926
val printIt = { it: String -> println(it) }
2027

2128
subscription += asyncObservable().subscribe(printIt)
@@ -36,80 +43,80 @@ fun main(args: Array<String>) {
3643

3744
zip(listOfObservables())
3845

39-
simpleObservable().subscribe(FunctionSubscriber<String>()
40-
.onNext { s -> println("1st onNext => $s") }
41-
.onNext { s -> println("2nd onNext => $s") })
46+
simpleObservable().subscribeBy(
47+
onNext = { s: String -> println("1st onNext => $s") } andThen { println("2nd onNext => $it") }
48+
)
4249

4350
addToCompositeSubscription()
4451
}
4552

4653
private fun URL.toScannerObservable() = observable<String> { s ->
4754
this.openStream().use { stream ->
48-
Scanner(stream).useDelimiter("\\A").toObservable().subscribe(s)
55+
Scanner(stream).useDelimiter("\\A")
56+
.toObservable()
57+
.subscribe { s.onNext(it) }
4958
}
5059
}
5160

52-
fun syncObservable(): Observable<String> =
53-
observable { subscriber ->
54-
(0..75).toObservable()
55-
.map { "Sync value_$it" }
56-
.subscribe(subscriber)
57-
}
61+
fun syncObservable(): Observable<String> = observable { subscriber ->
62+
(0..75).toObservable()
63+
.map { "Sync value_$it" }
64+
.subscribe { subscriber.onNext(it) }
65+
}
5866

59-
fun asyncObservable(): Observable<String> =
60-
observable { subscriber ->
61-
thread {
62-
(0..75).toObservable()
63-
.map { "Async value_$it" }
64-
.subscribe(subscriber)
65-
}
67+
fun asyncObservable(): Observable<String> = observable { subscriber ->
68+
thread {
69+
(0..75).toObservable()
70+
.map { "Async value_$it" }
71+
.subscribe { subscriber.onNext(it) }
6672
}
73+
}
6774

68-
fun asyncWiki(vararg articleNames: String): Observable<String> =
69-
observable { subscriber ->
70-
thread {
71-
articleNames.toObservable()
72-
.flatMap { name -> URL("http://en.wikipedia.org/wiki/$name").toScannerObservable().first() }
73-
.subscribe(subscriber)
74-
}
75+
fun asyncWiki(vararg articleNames: String): Observable<String> = observable { subscriber ->
76+
thread {
77+
articleNames.toObservable()
78+
.flatMapMaybe { name -> URL("http://en.wikipedia.org/wiki/$name").toScannerObservable().firstElement() }
79+
.subscribe { subscriber.onNext(it) }
7580
}
81+
}
7682

77-
fun asyncWikiWithErrorHandling(vararg articleNames: String): Observable<String> =
78-
observable { subscriber ->
79-
thread {
80-
articleNames.toObservable()
81-
.flatMap { name -> URL("http://en.wikipedia.org/wiki/$name").toScannerObservable().first() }
82-
.onError { e ->
83-
subscriber.onError(e) }
84-
.subscribe(subscriber)
85-
}
83+
fun asyncWikiWithErrorHandling(vararg articleNames: String): Observable<String> = observable { subscriber ->
84+
thread {
85+
articleNames.toObservable()
86+
.flatMapMaybe { name -> URL("http://en.wikipedia.org/wiki/$name").toScannerObservable().firstElement() }
87+
.subscribe({ subscriber.onNext(it) }, { subscriber.onError(it) })
8688
}
89+
}
8790

8891
fun simpleComposition() {
89-
asyncObservable().skip(10).take(5)
90-
.map { s -> "${s}_xform" }
91-
.subscribe { s -> println("onNext => $s") }
92+
asyncObservable()
93+
.skip(10)
94+
.take(5)
95+
.map { "${it}_xform" }
96+
.subscribe { println("onNext => $it") }
9297
}
9398

9499
fun listOfObservables(): List<Observable<String>> = listOf(syncObservable(), syncObservable())
95100

96101
fun combineLatest(observables: List<Observable<String>>) {
97-
observables.combineLatest { it.reduce { one, two -> one + two } }.subscribe { println(it) }
102+
observables.combineLatest { it.reduce { one, two -> one + two } }.subscribe(::println)
98103
}
99104

100105
fun zip(observables: List<Observable<String>>) {
101-
observables.zip { it.reduce { one, two -> one + two } }.subscribe { println(it) }
106+
observables.zip { it.reduce { one, two -> one + two } }.subscribe(::println)
102107
}
103108

104109
fun simpleObservable(): Observable<String> = (0..17).toObservable().map { "Simple $it" }
105110

106111
fun addToCompositeSubscription() {
107-
val compositeSubscription = CompositeSubscription()
112+
val compositeSubscription = CompositeDisposable()
108113

109114
Observable.just("test")
110115
.delay(100, TimeUnit.MILLISECONDS)
111116
.subscribe()
112117
.addTo(compositeSubscription)
113118

114-
compositeSubscription.unsubscribe()
115-
}
119+
compositeSubscription.dispose()
120+
}
121+
122+
infix inline fun <T : Any> ((T) -> Unit).andThen(crossinline block: (T) -> Unit): (T) -> Unit = { this(it); block(it) }

0 commit comments

Comments
 (0)