Skip to content

Commit 4bbdb88

Browse files
committed
Fix expand current concurrency on cancel()
1 parent 309771e commit 4bbdb88

File tree

2 files changed

+165
-16
lines changed

2 files changed

+165
-16
lines changed

src/main/java/hu/akarnokd/rxjava2/operators/FlowableExpand.java

+45-15
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ static final class ExpandDepthSubscription<T>
185185

186186
final AtomicLong requested;
187187

188+
final AtomicReference<Object> current;
189+
188190
ArrayDeque<ExpandDepthSubscriber> subscriptionStack;
189191

190192
volatile boolean cancelled;
@@ -193,8 +195,6 @@ static final class ExpandDepthSubscription<T>
193195

194196
long consumed;
195197

196-
ExpandDepthSubscriber current;
197-
198198
ExpandDepthSubscription(Subscriber<? super T> actual,
199199
Function<? super T, ? extends Publisher<? extends T>> expander,
200200
int capacityHint) {
@@ -204,6 +204,7 @@ static final class ExpandDepthSubscription<T>
204204
this.error = new AtomicThrowable();
205205
this.active = new AtomicInteger();
206206
this.requested = new AtomicLong();
207+
this.current = new AtomicReference<Object>();
207208
}
208209

209210
@Override
@@ -214,6 +215,7 @@ public void request(long n) {
214215
}
215216
}
216217

218+
@SuppressWarnings("unchecked")
217219
@Override
218220
public void cancel() {
219221
if (!cancelled) {
@@ -229,6 +231,11 @@ public void cancel() {
229231
q.poll().dispose();
230232
}
231233
}
234+
235+
Object o = current.getAndSet(this);
236+
if (o != this && o != null) {
237+
((ExpandDepthSubscriber)o).dispose();
238+
}
232239
}
233240
}
234241

@@ -250,6 +257,21 @@ boolean push(ExpandDepthSubscriber subscriber) {
250257
}
251258
}
252259

260+
boolean setCurrent(ExpandDepthSubscriber inner) {
261+
for (;;) {
262+
Object o = current.get();
263+
if (o == this) {
264+
if (inner != null) {
265+
inner.dispose();
266+
}
267+
return false;
268+
}
269+
if (current.compareAndSet(o, inner)) {
270+
return true;
271+
}
272+
}
273+
}
274+
253275
void drainQueue() {
254276
if (getAndIncrement() != 0) {
255277
return;
@@ -261,13 +283,14 @@ void drainQueue() {
261283
AtomicInteger n = active;
262284

263285
for (;;) {
264-
if (cancelled) {
286+
Object o = current.get();
287+
if (cancelled || o == this) {
265288
source = null;
266-
current = null;
267289
return;
268290
}
269291

270-
ExpandDepthSubscriber curr = current;
292+
@SuppressWarnings("unchecked")
293+
ExpandDepthSubscriber curr = (ExpandDepthSubscriber)o;
271294
Publisher<? extends T> p = source;
272295

273296
if (curr == null && p != null) {
@@ -276,9 +299,11 @@ void drainQueue() {
276299

277300
ExpandDepthSubscriber eds = new ExpandDepthSubscriber();
278301
curr = eds;
279-
current = eds;
280-
281-
p.subscribe(eds);
302+
if (setCurrent(eds)) {
303+
p.subscribe(eds);
304+
} else {
305+
return;
306+
}
282307
} else {
283308

284309
boolean currentDone = curr.done;
@@ -306,10 +331,12 @@ void drainQueue() {
306331
if (push(curr)) {
307332
n.getAndIncrement();
308333
curr = new ExpandDepthSubscriber();
309-
current = curr;
310-
311-
p.subscribe(curr);
312-
newSource = true;
334+
if (setCurrent(curr)) {
335+
p.subscribe(curr);
336+
newSource = true;
337+
} else {
338+
return;
339+
}
313340
}
314341
}
315342
}
@@ -326,9 +353,12 @@ void drainQueue() {
326353
return;
327354
}
328355
curr = pop();
329-
current = curr;
330-
curr.requestOne();
331-
continue;
356+
if (setCurrent(curr)) {
357+
curr.requestOne();
358+
continue;
359+
} else {
360+
return;
361+
}
332362
}
333363
}
334364
}

src/test/java/hu/akarnokd/rxjava2/operators/FlowableExpandTest.java

+120-1
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
import java.io.IOException;
2020
import java.util.*;
21-
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.*;
22+
import java.util.concurrent.atomic.AtomicInteger;
2223

2324
import org.junit.*;
2425
import org.reactivestreams.*;
2526

27+
import hu.akarnokd.rxjava2.test.TestHelper;
2628
import io.reactivex.*;
2729
import io.reactivex.functions.Function;
2830
import io.reactivex.internal.functions.Functions;
@@ -390,4 +392,121 @@ public void onComplete() {
390392

391393
ts.assertResult(1);
392394
}
395+
396+
@Test
397+
public void depthCancelRace() {
398+
for (int i = 0; i < 1000; i++) {
399+
final TestSubscriber<Integer> ts = Flowable.just(0)
400+
.compose(FlowableTransformers.<Integer>expand(countDown, ExpandStrategy.DEPTH_FIRST))
401+
.test(0);
402+
403+
Runnable r1 = new Runnable() {
404+
@Override
405+
public void run() {
406+
ts.request(1);
407+
}
408+
};
409+
Runnable r2 = new Runnable() {
410+
@Override
411+
public void run() {
412+
ts.cancel();
413+
}
414+
};
415+
416+
TestHelper.race(r1, r2, Schedulers.single());
417+
}
418+
}
419+
420+
@Test
421+
public void depthEmitCancelRace() {
422+
for (int i = 0; i < 1000; i++) {
423+
424+
final PublishProcessor<Integer> pp = PublishProcessor.create();
425+
426+
final TestSubscriber<Integer> ts = Flowable.just(0)
427+
.compose(FlowableTransformers.<Integer>expand(Functions.justFunction(pp), ExpandStrategy.DEPTH_FIRST))
428+
.test(1);
429+
430+
Runnable r1 = new Runnable() {
431+
@Override
432+
public void run() {
433+
pp.onNext(1);
434+
}
435+
};
436+
Runnable r2 = new Runnable() {
437+
@Override
438+
public void run() {
439+
ts.cancel();
440+
}
441+
};
442+
443+
TestHelper.race(r1, r2, Schedulers.single());
444+
}
445+
}
446+
447+
@Test
448+
public void depthCompleteCancelRace() {
449+
for (int i = 0; i < 1000; i++) {
450+
451+
final PublishProcessor<Integer> pp = PublishProcessor.create();
452+
453+
final TestSubscriber<Integer> ts = Flowable.just(0)
454+
.compose(FlowableTransformers.<Integer>expand(Functions.justFunction(pp), ExpandStrategy.DEPTH_FIRST))
455+
.test(1);
456+
457+
Runnable r1 = new Runnable() {
458+
@Override
459+
public void run() {
460+
pp.onComplete();
461+
}
462+
};
463+
Runnable r2 = new Runnable() {
464+
@Override
465+
public void run() {
466+
ts.cancel();
467+
}
468+
};
469+
470+
TestHelper.race(r1, r2, Schedulers.single());
471+
}
472+
}
473+
474+
@Test
475+
public void depthCancelRace2() throws Exception {
476+
for (int i = 0; i < 1000; i++) {
477+
478+
final PublishProcessor<Integer> pp = PublishProcessor.create();
479+
480+
Flowable<Integer> source = Flowable.just(0)
481+
.compose(FlowableTransformers.<Integer>expand(Functions.justFunction(pp), ExpandStrategy.DEPTH_FIRST));
482+
483+
final CountDownLatch cdl = new CountDownLatch(1);
484+
485+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
486+
final AtomicInteger sync = new AtomicInteger(2);
487+
488+
@Override
489+
public void onNext(Integer t) {
490+
super.onNext(t);
491+
Schedulers.single().scheduleDirect(new Runnable() {
492+
@Override
493+
public void run() {
494+
if (sync.decrementAndGet() != 0) {
495+
while (sync.get() != 0) { }
496+
}
497+
cancel();
498+
cdl.countDown();
499+
}
500+
});
501+
if (sync.decrementAndGet() != 0) {
502+
while (sync.get() != 0) { }
503+
}
504+
}
505+
};
506+
507+
source.subscribe(ts);
508+
509+
Assert.assertTrue(cdl.await(5, TimeUnit.SECONDS));
510+
}
511+
}
393512
}

0 commit comments

Comments
 (0)