Skip to content

Commit c415b32

Browse files
authored
2.x: Fix zip not stopping the subscription upon eager error (#6488)
1 parent 4ca7a9b commit c415b32

File tree

3 files changed

+64
-0
lines changed

3 files changed

+64
-0
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java

+4
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ public void drain() {
179179
if (z.done && !delayError) {
180180
Throwable ex = z.error;
181181
if (ex != null) {
182+
cancelled = true;
182183
cancel();
183184
a.onError(ex);
184185
return;
@@ -224,6 +225,7 @@ boolean checkTerminated(boolean d, boolean empty, Observer<? super R> a, boolean
224225
if (delayError) {
225226
if (empty) {
226227
Throwable e = source.error;
228+
cancelled = true;
227229
cancel();
228230
if (e != null) {
229231
a.onError(e);
@@ -235,11 +237,13 @@ boolean checkTerminated(boolean d, boolean empty, Observer<? super R> a, boolean
235237
} else {
236238
Throwable e = source.error;
237239
if (e != null) {
240+
cancelled = true;
238241
cancel();
239242
a.onError(e);
240243
return true;
241244
} else
242245
if (empty) {
246+
cancelled = true;
243247
cancel();
244248
a.onComplete();
245249
return true;

src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -1895,4 +1895,34 @@ public Integer apply(Integer a, Integer b) throws Exception {
18951895

18961896
ts.assertResult(4);
18971897
}
1898+
1899+
@Test
1900+
public void firstErrorPreventsSecondSubscription() {
1901+
final AtomicInteger counter = new AtomicInteger();
1902+
1903+
List<Flowable<?>> flowableList = new ArrayList<Flowable<?>>();
1904+
flowableList.add(Flowable.create(new FlowableOnSubscribe<Object>() {
1905+
@Override
1906+
public void subscribe(FlowableEmitter<Object> e)
1907+
throws Exception { throw new TestException(); }
1908+
}, BackpressureStrategy.MISSING));
1909+
flowableList.add(Flowable.create(new FlowableOnSubscribe<Object>() {
1910+
@Override
1911+
public void subscribe(FlowableEmitter<Object> e)
1912+
throws Exception { counter.getAndIncrement(); }
1913+
}, BackpressureStrategy.MISSING));
1914+
1915+
Flowable.zip(flowableList,
1916+
new Function<Object[], Object>() {
1917+
@Override
1918+
public Object apply(Object[] a) throws Exception {
1919+
return a;
1920+
}
1921+
})
1922+
.test()
1923+
.assertFailure(TestException.class)
1924+
;
1925+
1926+
assertEquals(0, counter.get());
1927+
}
18981928
}

src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -1428,4 +1428,34 @@ public Integer apply(Integer t1, Integer t2) throws Exception {
14281428
ps2.onNext(2);
14291429
to.assertResult(3);
14301430
}
1431+
1432+
@Test
1433+
public void firstErrorPreventsSecondSubscription() {
1434+
final AtomicInteger counter = new AtomicInteger();
1435+
1436+
List<Observable<?>> observableList = new ArrayList<Observable<?>>();
1437+
observableList.add(Observable.create(new ObservableOnSubscribe<Object>() {
1438+
@Override
1439+
public void subscribe(ObservableEmitter<Object> e)
1440+
throws Exception { throw new TestException(); }
1441+
}));
1442+
observableList.add(Observable.create(new ObservableOnSubscribe<Object>() {
1443+
@Override
1444+
public void subscribe(ObservableEmitter<Object> e)
1445+
throws Exception { counter.getAndIncrement(); }
1446+
}));
1447+
1448+
Observable.zip(observableList,
1449+
new Function<Object[], Object>() {
1450+
@Override
1451+
public Object apply(Object[] a) throws Exception {
1452+
return a;
1453+
}
1454+
})
1455+
.test()
1456+
.assertFailure(TestException.class)
1457+
;
1458+
1459+
assertEquals(0, counter.get());
1460+
}
14311461
}

0 commit comments

Comments
 (0)