Skip to content

Commit 61daee8

Browse files
committed
Upgrade to RxJava 2.0.7 and fix internal uses
1 parent 1fad3e2 commit 61daee8

File tree

6 files changed

+12
-58
lines changed

6 files changed

+12
-58
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ dependencies {
4949
signature 'org.codehaus.mojo.signature:java16:1.1@signature'
5050

5151
compile "org.reactivestreams:reactive-streams:1.0.0"
52-
compile "io.reactivex.rxjava2:rxjava:2.0.6"
52+
compile "io.reactivex.rxjava2:rxjava:2.0.7"
5353

5454
testCompile group: 'junit', name: 'junit', version: '4.12'
5555

src/main/java/hu/akarnokd/rxjava2/async/AsyncFlowable.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import io.reactivex.functions.*;
2828
import io.reactivex.internal.disposables.SequentialDisposable;
2929
import io.reactivex.internal.functions.Functions;
30-
import io.reactivex.internal.operators.flowable.FlowableSubscribeOn;
3130
import io.reactivex.internal.subscribers.LambdaSubscriber;
3231
import io.reactivex.plugins.RxJavaPlugins;
3332
import io.reactivex.processors.*;
@@ -1718,7 +1717,6 @@ public static <T> Future<Object> forEachFuture(
17181717
* @return the Future representing the entire for-each operation
17191718
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Async-Operators#wiki-foreachfuture">RxJava Wiki: forEachFuture()</a>
17201719
*/
1721-
@SuppressWarnings("unchecked")
17221720
public static <T> Future<Object> forEachFuture(
17231721
Publisher<? extends T> source,
17241722
Consumer<? super T> onNext,
@@ -1762,7 +1760,7 @@ public void accept(Subscription s) throws Exception {
17621760
});
17631761
d.lazySet(ls);
17641762

1765-
RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>((Publisher<T>)source, scheduler, false)).subscribe(ls);
1763+
Flowable.fromPublisher(source).subscribeOn(scheduler).subscribe(ls);
17661764

17671765
return f;
17681766
}

src/main/java/hu/akarnokd/rxjava2/basetypes/Perhaps.java

+4-10
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import io.reactivex.exceptions.Exceptions;
2626
import io.reactivex.functions.*;
2727
import io.reactivex.internal.functions.*;
28-
import io.reactivex.internal.operators.flowable.*;
2928
import io.reactivex.internal.subscribers.LambdaSubscriber;
3029
import io.reactivex.internal.util.ExceptionHelper;
3130
import io.reactivex.plugins.RxJavaPlugins;
@@ -933,7 +932,7 @@ public final Perhaps<T> switchIfEmpty(Perhaps<? extends T> other) {
933932
* @return the new Flowable instance
934933
*/
935934
public final Flowable<T> repeat() {
936-
return RxJavaPlugins.onAssembly(new FlowableRepeat<T>(this, Long.MAX_VALUE));
935+
return Flowable.fromPublisher(this).repeat();
937936
}
938937

939938
/**
@@ -942,10 +941,7 @@ public final Flowable<T> repeat() {
942941
* @return the new Flowable instance
943942
*/
944943
public final Flowable<T> repeat(long times) {
945-
if (times < 0L) {
946-
throw new IllegalArgumentException("times >= 0 required but it was " + times);
947-
}
948-
return RxJavaPlugins.onAssembly(new FlowableRepeat<T>(this, times));
944+
return Flowable.fromPublisher(this).repeat(times);
949945
}
950946

951947
/**
@@ -956,8 +952,7 @@ public final Flowable<T> repeat(long times) {
956952
* @return the new Flowable instance
957953
*/
958954
public final Flowable<T> repeat(BooleanSupplier stop) {
959-
ObjectHelper.requireNonNull(stop, "stop is null");
960-
return RxJavaPlugins.onAssembly(new FlowableRepeatUntil<T>(this, stop));
955+
return Flowable.fromPublisher(this).repeatUntil(stop);
961956
}
962957

963958
/**
@@ -969,8 +964,7 @@ public final Flowable<T> repeat(BooleanSupplier stop) {
969964
* @return the new Flowable instance
970965
*/
971966
public final Flowable<T> repeatWhen(Function<? super Flowable<Object>, ? extends Publisher<?>> handler) {
972-
ObjectHelper.requireNonNull(handler, "handler is null");
973-
return RxJavaPlugins.onAssembly(new FlowableRepeatWhen<T>(this, handler));
967+
return Flowable.fromPublisher(this).repeatWhen(handler);
974968
}
975969

976970
/**

src/main/java/hu/akarnokd/rxjava2/basetypes/Solo.java

+4-10
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import io.reactivex.exceptions.Exceptions;
2626
import io.reactivex.functions.*;
2727
import io.reactivex.internal.functions.*;
28-
import io.reactivex.internal.operators.flowable.*;
2928
import io.reactivex.internal.subscribers.LambdaSubscriber;
3029
import io.reactivex.internal.util.ExceptionHelper;
3130
import io.reactivex.plugins.RxJavaPlugins;
@@ -1179,7 +1178,7 @@ public final <R> Solo<R> lift(Function<Subscriber<? super R>, Subscriber<? super
11791178
* @return the new Flowable instance
11801179
*/
11811180
public final Flowable<T> repeat() {
1182-
return RxJavaPlugins.onAssembly(new FlowableRepeat<T>(this, Long.MAX_VALUE));
1181+
return Flowable.fromPublisher(this).repeat();
11831182
}
11841183

11851184
/**
@@ -1188,10 +1187,7 @@ public final Flowable<T> repeat() {
11881187
* @return the new Flowable instance
11891188
*/
11901189
public final Flowable<T> repeat(long times) {
1191-
if (times < 0L) {
1192-
throw new IllegalArgumentException("times >= 0 required but it was " + times);
1193-
}
1194-
return RxJavaPlugins.onAssembly(new FlowableRepeat<T>(this, times));
1190+
return Flowable.fromPublisher(this).repeat(times);
11951191
}
11961192

11971193
/**
@@ -1202,8 +1198,7 @@ public final Flowable<T> repeat(long times) {
12021198
* @return the new Flowable instance
12031199
*/
12041200
public final Flowable<T> repeat(BooleanSupplier stop) {
1205-
ObjectHelper.requireNonNull(stop, "stop is null");
1206-
return RxJavaPlugins.onAssembly(new FlowableRepeatUntil<T>(this, stop));
1201+
return Flowable.fromPublisher(this).repeatUntil(stop);
12071202
}
12081203

12091204
/**
@@ -1215,8 +1210,7 @@ public final Flowable<T> repeat(BooleanSupplier stop) {
12151210
* @return the new Flowable instance
12161211
*/
12171212
public final Flowable<T> repeatWhen(Function<? super Flowable<Object>, ? extends Publisher<?>> handler) {
1218-
ObjectHelper.requireNonNull(handler, "handler is null");
1219-
return RxJavaPlugins.onAssembly(new FlowableRepeatWhen<T>(this, handler));
1213+
return Flowable.fromPublisher(this).repeatWhen(handler);
12201214
}
12211215

12221216
/**

src/main/java/hu/akarnokd/rxjava2/parallel/ParallelConcatMap.java

+1-17
Original file line numberDiff line numberDiff line change
@@ -66,24 +66,8 @@ public void subscribe(Subscriber<? super R>[] subscribers) {
6666
@SuppressWarnings("unchecked")
6767
final Subscriber<T>[] parents = new Subscriber[n];
6868

69-
// FIXME cheat until we have support from RxJava2 internals
70-
Publisher<T> p = new Publisher<T>() {
71-
int i;
72-
73-
@SuppressWarnings("unchecked")
74-
@Override
75-
public void subscribe(Subscriber<? super T> s) {
76-
parents[i++] = (Subscriber<T>)s;
77-
}
78-
};
79-
80-
FlowableConcatMap<T, R> op = new FlowableConcatMap<T, R>(p, mapper, prefetch, errorMode);
81-
8269
for (int i = 0; i < n; i++) {
83-
84-
op.subscribe(subscribers[i]);
85-
// FIXME needs a FlatMap subscriber
86-
// parents[i] = FlowableConcatMap.createSubscriber(s, mapper, prefetch, errorMode);
70+
parents[i] = FlowableConcatMap.subscribe(subscribers[i], mapper, prefetch, errorMode);
8771
}
8872

8973
source.subscribe(parents);

src/main/java/hu/akarnokd/rxjava2/parallel/ParallelFlatMap.java

+1-17
Original file line numberDiff line numberDiff line change
@@ -70,24 +70,8 @@ public void subscribe(Subscriber<? super R>[] subscribers) {
7070
@SuppressWarnings("unchecked")
7171
final Subscriber<T>[] parents = new Subscriber[n];
7272

73-
// FIXME cheat until we have support from RxJava2 internals
74-
Publisher<T> p = new Publisher<T>() {
75-
int i;
76-
77-
@SuppressWarnings("unchecked")
78-
@Override
79-
public void subscribe(Subscriber<? super T> s) {
80-
parents[i++] = (Subscriber<T>)s;
81-
}
82-
};
83-
84-
FlowableFlatMap<T, R> op = new FlowableFlatMap<T, R>(p, mapper, delayError, maxConcurrency, prefetch);
85-
8673
for (int i = 0; i < n; i++) {
87-
88-
op.subscribe(subscribers[i]);
89-
// FIXME needs a FlatMap subscriber
90-
// parents[i] = FlowableFlatMap.createSubscriber(s, mapper, delayError, maxConcurrency, prefetch);
74+
parents[i] = FlowableFlatMap.subscribe(subscribers[i], mapper, delayError, maxConcurrency, prefetch);
9175
}
9276

9377
source.subscribe(parents);

0 commit comments

Comments
 (0)