Skip to content

Commit cfb23d0

Browse files
committed
Fix expand inconsistent error handling
1 parent 3b3ed42 commit cfb23d0

File tree

3 files changed

+185
-10
lines changed

3 files changed

+185
-10
lines changed

Diff for: src/main/java/hu/akarnokd/rxjava2/operators/FlowableExpand.java

+38-9
Original file line numberDiff line numberDiff line change
@@ -45,31 +45,34 @@ final class FlowableExpand<T> extends Flowable<T> implements FlowableTransformer
4545

4646
final int capacityHint;
4747

48+
final boolean delayErrors;
49+
4850
FlowableExpand(Flowable<T> source, Function<? super T, ? extends Publisher<? extends T>> expander,
49-
ExpandStrategy strategy, int capacityHint) {
51+
ExpandStrategy strategy, int capacityHint, boolean delayErrors) {
5052
this.source = source;
5153
this.expander = expander;
5254
this.strategy = strategy;
5355
this.capacityHint = capacityHint;
56+
this.delayErrors = delayErrors;
5457
}
5558

5659
@Override
5760
protected void subscribeActual(Subscriber<? super T> s) {
5861
if (strategy != ExpandStrategy.DEPTH_FIRST) {
59-
ExpandBreadthSubscriber<T> parent = new ExpandBreadthSubscriber<T>(s, expander, capacityHint);
62+
ExpandBreadthSubscriber<T> parent = new ExpandBreadthSubscriber<T>(s, expander, capacityHint, delayErrors);
6063
parent.queue.offer(source);
6164
s.onSubscribe(parent);
6265
parent.drainQueue();
6366
} else {
64-
ExpandDepthSubscription<T> parent = new ExpandDepthSubscription<T>(s, expander, capacityHint);
67+
ExpandDepthSubscription<T> parent = new ExpandDepthSubscription<T>(s, expander, capacityHint, delayErrors);
6568
parent.source = source;
6669
s.onSubscribe(parent);
6770
}
6871
}
6972

7073
@Override
7174
public Publisher<T> apply(Flowable<T> upstream) {
72-
return new FlowableExpand<T>(upstream, expander, strategy, capacityHint);
75+
return new FlowableExpand<T>(upstream, expander, strategy, capacityHint, delayErrors);
7376
}
7477

7578
static final class ExpandBreadthSubscriber<T> extends SubscriptionArbiter implements FlowableSubscriber<T> {
@@ -84,16 +87,22 @@ static final class ExpandBreadthSubscriber<T> extends SubscriptionArbiter implem
8487

8588
final AtomicInteger wip;
8689

90+
final boolean delayErrors;
91+
92+
final AtomicThrowable errors;
93+
8794
volatile boolean active;
8895

8996
long produced;
9097

9198
ExpandBreadthSubscriber(Subscriber<? super T> actual,
92-
Function<? super T, ? extends Publisher<? extends T>> expander, int capacityHint) {
99+
Function<? super T, ? extends Publisher<? extends T>> expander, int capacityHint, boolean delayErrors) {
93100
this.actual = actual;
94101
this.expander = expander;
95102
this.wip = new AtomicInteger();
96103
this.queue = new SpscLinkedArrayQueue<Publisher<? extends T>>(capacityHint);
104+
this.errors = new AtomicThrowable();
105+
this.delayErrors = delayErrors;
97106
}
98107

99108
@Override
@@ -123,8 +132,13 @@ public void onNext(T t) {
123132
@Override
124133
public void onError(Throwable t) {
125134
setSubscription(SubscriptionHelper.CANCELLED);
126-
super.cancel();
127-
actual.onError(t);
135+
if (delayErrors) {
136+
errors.addThrowable(t);
137+
active = false;
138+
} else {
139+
super.cancel();
140+
actual.onError(t);
141+
}
128142
drainQueue();
129143
}
130144

@@ -151,7 +165,12 @@ void drainQueue() {
151165
if (q.isEmpty()) {
152166
setSubscription(SubscriptionHelper.CANCELLED);
153167
super.cancel();
154-
actual.onComplete();
168+
Throwable ex = errors.terminate();
169+
if (ex == null) {
170+
actual.onComplete();
171+
} else {
172+
actual.onError(ex);
173+
}
155174
} else {
156175
Publisher<? extends T> p = q.poll();
157176
long c = produced;
@@ -187,6 +206,8 @@ static final class ExpandDepthSubscription<T>
187206

188207
final AtomicReference<Object> current;
189208

209+
final boolean delayErrors;
210+
190211
ArrayDeque<ExpandDepthSubscriber> subscriptionStack;
191212

192213
volatile boolean cancelled;
@@ -197,14 +218,15 @@ static final class ExpandDepthSubscription<T>
197218

198219
ExpandDepthSubscription(Subscriber<? super T> actual,
199220
Function<? super T, ? extends Publisher<? extends T>> expander,
200-
int capacityHint) {
221+
int capacityHint, boolean delayErrors) {
201222
this.actual = actual;
202223
this.expander = expander;
203224
this.subscriptionStack = new ArrayDeque<ExpandDepthSubscriber>();
204225
this.error = new AtomicThrowable();
205226
this.active = new AtomicInteger();
206227
this.requested = new AtomicLong();
207228
this.current = new AtomicReference<Object>();
229+
this.delayErrors = delayErrors;
208230
}
209231

210232
@Override
@@ -307,6 +329,13 @@ void drainQueue() {
307329
} else {
308330

309331
boolean currentDone = curr.done;
332+
333+
if (!delayErrors && error.get() != null) {
334+
cancel();
335+
a.onError(error.terminate());
336+
return;
337+
}
338+
310339
T v = curr.value;
311340

312341
boolean newSource = false;

Diff for: src/main/java/hu/akarnokd/rxjava2/operators/FlowableTransformers.java

+55-1
Original file line numberDiff line numberDiff line change
@@ -901,7 +901,61 @@ public static <T> FlowableTransformer<T, T> expand(Function<? super T, ? extends
901901
ObjectHelper.requireNonNull(expander, "expander is null");
902902
ObjectHelper.requireNonNull(strategy, "strategy is null");
903903
ObjectHelper.verifyPositive(capacityHint, "capacityHint");
904-
return new FlowableExpand<T>(null, expander, strategy, capacityHint);
904+
return new FlowableExpand<T>(null, expander, strategy, capacityHint, false);
905+
}
906+
907+
/**
908+
* Emits elements from the source and then expands them into another layer of Publishers, emitting
909+
* those items recursively until all Publishers become empty in a depth-first manner,
910+
* delaying errors until all sources terminate.
911+
* @param <T> the value type
912+
* @param expander the function that converts an element into a Publisher to be expanded
913+
* @return the new FlwoableTransformer instance
914+
*
915+
* @since 0.18.4
916+
*/
917+
public static <T> FlowableTransformer<T, T> expandDelayError(Function<? super T, ? extends Publisher<? extends T>> expander) {
918+
return expandDelayError(expander, ExpandStrategy.DEPTH_FIRST, Flowable.bufferSize());
919+
}
920+
921+
/**
922+
* Emits elements from the source and then expands them into another layer of Publishers, emitting
923+
* those items recursively until all Publishers become empty with the specified strategy,
924+
* delaying errors until all sources terminate.
925+
* @param <T> the value type
926+
* @param expander the function that converts an element into a Publisher to be expanded
927+
* @param strategy the expansion strategy; depth-first will recursively expand the first item until there is no
928+
* more expansion possible, then the second items, and so on;
929+
* breadth-first will first expand the main source, then runs the expaned
930+
* Publishers in sequence, then the 3rd level, and so on.
931+
* @return the new FlwoableTransformer instance
932+
*
933+
* @since 0.18.4
934+
*/
935+
public static <T> FlowableTransformer<T, T> expandDelayError(Function<? super T, ? extends Publisher<? extends T>> expander, ExpandStrategy strategy) {
936+
return expandDelayError(expander, strategy, Flowable.bufferSize());
937+
}
938+
939+
/**
940+
* Emits elements from the source and then expands them into another layer of Publishers, emitting
941+
* those items recursively until all Publishers become empty with the specified strategy,
942+
* delaying errors until all sources terminate.
943+
* @param <T> the value type
944+
* @param expander the function that converts an element into a Publisher to be expanded
945+
* @param strategy the expansion strategy; depth-first will recursively expand the first item until there is no
946+
* more expansion possible, then the second items, and so on;
947+
* breadth-first will first expand the main source, then runs the expaned
948+
* Publishers in sequence, then the 3rd level, and so on.
949+
* @param capacityHint the capacity hint for the breadth-first expansion
950+
* @return the new FlwoableTransformer instance
951+
*
952+
* @since 0.18.4
953+
*/
954+
public static <T> FlowableTransformer<T, T> expandDelayError(Function<? super T, ? extends Publisher<? extends T>> expander, ExpandStrategy strategy, int capacityHint) {
955+
ObjectHelper.requireNonNull(expander, "expander is null");
956+
ObjectHelper.requireNonNull(strategy, "strategy is null");
957+
ObjectHelper.verifyPositive(capacityHint, "capacityHint");
958+
return new FlowableExpand<T>(null, expander, strategy, capacityHint, true);
905959
}
906960

907961
/**

Diff for: src/test/java/hu/akarnokd/rxjava2/operators/FlowableExpandTest.java

+92
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,18 @@ public void error() {
7171
Flowable.<Integer>error(new IOException())
7272
.compose(FlowableTransformers.expand(countDown, strategy))
7373
.test()
74+
.withTag(strategy.toString())
75+
.assertFailure(IOException.class);
76+
}
77+
}
78+
79+
@Test
80+
public void errorDelayed() {
81+
for (ExpandStrategy strategy : ExpandStrategy.values()) {
82+
Flowable.<Integer>error(new IOException())
83+
.compose(FlowableTransformers.expandDelayError(countDown, strategy))
84+
.test()
85+
.withTag(strategy.toString())
7486
.assertFailure(IOException.class);
7587
}
7688
}
@@ -509,4 +521,84 @@ public void run() {
509521
Assert.assertTrue(cdl.await(5, TimeUnit.SECONDS));
510522
}
511523
}
524+
525+
@Test
526+
public void multipleRoots() {
527+
Flowable.just(10, 5)
528+
.compose(FlowableTransformers.expand(new Function<Integer, Publisher<Integer>>() {
529+
@Override
530+
public Publisher<Integer> apply(Integer v)
531+
throws Exception {
532+
if (v == 9) {
533+
return Flowable.error(new IOException("error"));
534+
} else if (v == 0) {
535+
return Flowable.empty();
536+
} else {
537+
return Flowable.just(v - 1);
538+
}
539+
}
540+
}, ExpandStrategy.DEPTH_FIRST))
541+
.test()
542+
.assertFailureAndMessage(IOException.class, "error", 10, 9);
543+
}
544+
545+
@Test
546+
public void multipleRootsDelayError() {
547+
Flowable.just(10, 5)
548+
.compose(FlowableTransformers.expandDelayError(new Function<Integer, Publisher<Integer>>() {
549+
@Override
550+
public Publisher<Integer> apply(Integer v)
551+
throws Exception {
552+
if (v == 9) {
553+
return Flowable.error(new IOException("error"));
554+
} else if (v == 0) {
555+
return Flowable.empty();
556+
} else {
557+
return Flowable.just(v - 1);
558+
}
559+
}
560+
}, ExpandStrategy.DEPTH_FIRST))
561+
.test()
562+
.assertFailureAndMessage(IOException.class, "error", 10, 9, 5, 4, 3, 2, 1, 0);
563+
}
564+
565+
@Test
566+
public void multipleRootsBreadth() {
567+
Flowable.just(10, 5)
568+
.compose(FlowableTransformers.expand(new Function<Integer, Publisher<Integer>>() {
569+
@Override
570+
public Publisher<Integer> apply(Integer v)
571+
throws Exception {
572+
if (v == 9) {
573+
return Flowable.error(new IOException("error"));
574+
} else if (v == 0) {
575+
return Flowable.empty();
576+
} else {
577+
return Flowable.just(v - 1);
578+
}
579+
}
580+
}, ExpandStrategy.BREADTH_FIRST))
581+
.test()
582+
.assertFailureAndMessage(IOException.class, "error", 10, 5, 9, 4);
583+
}
584+
585+
@Test
586+
public void multipleRootsDelayErrorBreadth() {
587+
Flowable.just(10, 5)
588+
.compose(FlowableTransformers.expandDelayError(new Function<Integer, Publisher<Integer>>() {
589+
@Override
590+
public Publisher<Integer> apply(Integer v)
591+
throws Exception {
592+
if (v == 9) {
593+
return Flowable.error(new IOException("error"));
594+
} else if (v == 0) {
595+
return Flowable.empty();
596+
} else {
597+
return Flowable.just(v - 1);
598+
}
599+
}
600+
}, ExpandStrategy.BREADTH_FIRST))
601+
.test()
602+
.assertFailureAndMessage(IOException.class, "error", 10, 5, 9, 4, 3, 2, 1, 0);
603+
}
512604
}

0 commit comments

Comments
 (0)