Skip to content

Commit d4943a2

Browse files
committed
SwitchFlatMap working.
1 parent c03441b commit d4943a2

File tree

4 files changed

+484
-77
lines changed

4 files changed

+484
-77
lines changed

Diff for: README.md

+32-12
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,7 @@ try {
572572
The custom transformers (to be applied with `Flowable.compose` for example), can be found in `hu.akarnokd.rxjava2.operators.FlowableTransformers` class. The custom source-like operators can be found in `hu.akarnokd.rxjava2.operators.Flowables` class. The operators and transformers for the other base
573573
reactive classes (will) follow the usual naming scheme.
574574

575-
### FlowableTransflormers.valve
575+
### FlowableTransflormers.valve()
576576

577577
Pauses and resumes a main flow if the secondary flow signals false and true respectively.
578578

@@ -602,7 +602,7 @@ valveSource.onNext(true);
602602
Thread.sleep(3000);
603603
```
604604

605-
### Flowables.orderedMerge
605+
### Flowables.orderedMerge()
606606

607607
Given a fixed number of input sources (which can be self-comparable or given a `Comparator`) merges them
608608
into a single stream by repeatedly picking the smallest one from each source until all of them completes.
@@ -613,7 +613,7 @@ Flowables.orderedMerge(Flowable.just(1, 3, 5), Flowable.just(2, 4, 6))
613613
.assertResult(1, 2, 3, 4, 5, 6);
614614
```
615615

616-
### FlowableTransformers.bufferWhile
616+
### FlowableTransformers.bufferWhile()
617617

618618
Buffers into a list/collection while the given predicate returns true for
619619
the current item, otherwise starts a new list/collection containing the given item (i.e., the "separator" ends up in the next list/collection).
@@ -630,7 +630,7 @@ Flowable.just("1", "2", "#", "3", "#", "4", "#")
630630
);
631631
```
632632

633-
### FlowableTransformers.bufferUntil
633+
### FlowableTransformers.bufferUntil()
634634

635635
Buffers into a list/collection until the given predicate returns true for
636636
the current item and starts an new empty list/collection (i.e., the "separator" ends up in the same list/collection).
@@ -646,7 +646,7 @@ Flowable.just("1", "2", "#", "3", "#", "4", "#")
646646
);
647647
```
648648

649-
### FlowableTransformers.bufferSplit
649+
### FlowableTransformers.bufferSplit()
650650

651651
Buffers into a list/collection while the predicate returns false. When it returns true,
652652
a new buffer is started and the particular item won't be in any of the buffers.
@@ -662,7 +662,7 @@ Flowable.just("1", "2", "#", "3", "#", "4", "#")
662662
);
663663
```
664664

665-
### FlowableTransformers.spanout
665+
### FlowableTransformers.spanout()
666666

667667
Inserts a time delay between emissions from the upstream. For example, if the upstream emits 1, 2, 3 in a quick succession, a spanout(1, TimeUnit.SECONDS) will emit 1 immediately, 2 after a second and 3 after a second after 2. You can specify the initial delay, a custom scheduler and if an upstream error should be delayed after the normal items or not.
668668

@@ -675,7 +675,7 @@ Flowable.range(1, 10)
675675
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
676676
```
677677

678-
### FlowableTransformers.mapFilter
678+
### FlowableTransformers.mapFilter()
679679

680680
A callback `Consumer` is called with the current upstream value and a `BasicEmitter` on which doXXX methods can be called
681681
to transform a value, signal an error or stop a sequence. If none of the `doXXX` methods is called, the current value is dropped and another is requested from upstream. The operator is a pass-through for downstream requests otherwise.
@@ -694,7 +694,7 @@ Flowable.range(1, 10)
694694
.assertResult(4, 8);
695695
```
696696

697-
### FlowableTransformers.onBackpressureTimeout
697+
### FlowableTransformers.onBackpressureTimeout()
698698

699699
Consumes the upstream in an unbounded manner and buffers elements until the downstream requests but each buffered element has an associated timeout after which it becomes unavailable. Note that this may create discontinuities in the stream. In addition, an overload allows specifying the maximum buffer size and an eviction action which gets triggered when the buffer reaches its
700700
capacity or elements time out.
@@ -709,7 +709,7 @@ Flowable.intervalRange(1, 5, 100, 100, TimeUnit.MILLISECONDS)
709709
.assertResult();
710710
```
711711

712-
### Flowables.repeat
712+
### Flowables.repeat()
713713

714714
Repeats a scalar value indefinitely (until the downstream actually cancels), honoring backpressure and supporting synchronous fusion and/or conditional fusion.
715715

@@ -722,7 +722,7 @@ Flowable.repeat("doesn't matter")
722722
.assertResult(true);
723723
```
724724

725-
### Flowables.repeatCallable
725+
### Flowables.repeatCallable()
726726

727727
Repeatedly calls a callable, indefinitely (until the downstream actually cancels) or if the callable throws or returns null (when it signals `NullPointerException`), honoring backpressure and supporting synchronous fusion and/or conditional fusion.
728728

@@ -734,7 +734,7 @@ Flowable.repeatCallable(() -> ThreadLocalRandom.current().nextDouble())
734734
.assertResult(true);
735735
```
736736

737-
### FlowableTransformers.every
737+
### FlowableTransformers.every()
738738

739739
Relays every Nth item from upstream (skipping the in-between items).
740740

@@ -745,7 +745,7 @@ Flowable.range(1, 5)
745745
.assertResult(2, 4)
746746
```
747747

748-
### Flowables.intervalBackpressure
748+
### Flowables.intervalBackpressure()
749749

750750
Emit an ever increasing series of long values, starting from 0L and "buffer"
751751
emissions in case the downstream can't keep up. The "buffering" is virtual and isn't accompanied by increased memory usage if it happens for a longer
@@ -825,7 +825,27 @@ Flowable.just(0, 50, 100, 150, 400, 500, 550, 1000)
825825
826826
### FlowableTransformer.switchFlatMap
827827
828+
This is a combination of switchMap and a limited flatMap. It merges a maximum number of Publishers at once but if a new inner Publisher gets mapped in and the active count is at max, the oldest active Publisher is cancelled and the new inner Publisher gets flattened as well. Running with `maxActive == 1` is equivalent to the plain `switchMap`.
828829
830+
```java
831+
Flowable.just(100, 300, 500)
832+
.flatMap(v -> Flowable.timer(v, TimeUnit.MILLISECONDS).map(w -> v))
833+
.compose(FlowableTransformers.switchFlatMap(v -> {
834+
if (v == 100) {
835+
return Flowable.intervalRange(1, 3, 75, 100, TimeUnit.MILLISECONDS)
836+
.map(w -> "A" + w);
837+
} else
838+
if (v == 300) {
839+
return Flowable.intervalRange(1, 3, 10, 100, TimeUnit.MILLISECONDS)
840+
.map(w -> "B" + w);
841+
}
842+
return Flowable.intervalRange(1, 3, 20, 100, TimeUnit.MILLISECONDS)
843+
.map(w -> "C" + w);
844+
}, 2)
845+
.test()
846+
.awaitDone(5, TimeUnit.SECONDS)
847+
.assertResult("A1", "A2", "B1", "A3", "B2", "C1", B3", "C2", "C3);
848+
```
829849
830850
## Special Publisher implementations
831851

Diff for: src/main/java/hu/akarnokd/rxjava2/operators/FlowableSwitchFlatMap.java

+85-65
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import io.reactivex.functions.Function;
2727
import io.reactivex.internal.functions.ObjectHelper;
2828
import io.reactivex.internal.fuseable.SimplePlainQueue;
29-
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
29+
import io.reactivex.internal.queue.SpscArrayQueue;
3030
import io.reactivex.internal.subscriptions.SubscriptionHelper;
3131
import io.reactivex.internal.util.*;
3232
import io.reactivex.plugins.RxJavaPlugins;
@@ -243,103 +243,127 @@ void updateInners() {
243243
}
244244

245245
void drain() {
246-
if (getAndIncrement() != 0) {
246+
if (getAndIncrement() == 0) {
247247
int missed = 1;
248248
Subscriber<? super R> a = actual;
249249
SfmInnerSubscriber<T, R>[] inners = activeCache;
250+
AtomicThrowable err = error;
250251

251252
outer:
252253
for (;;) {
253-
updateInners();
254-
long ver = versionCache;
255254
long r = requested.get();
256255
long e = 0;
257256

258-
if (cancelled) {
259-
clearCache();
260-
return;
261-
}
262-
263-
boolean d = done;
264-
265-
if (d) {
266-
Throwable ex = error.get();
267-
if (ex != null) {
268-
clearCache();
269257

270-
a.onError(error.get());
271-
return;
272-
} else
273-
if (inners[0] == null) {
274-
a.onComplete();
275-
return;
276-
}
277-
}
278-
279-
draining:
280-
for (SfmInnerSubscriber<T, R> inner : inners) {
258+
for (;;) {
281259
if (cancelled) {
282260
clearCache();
283261
return;
284262
}
285263

286-
if (inner == null) {
287-
break;
288-
}
289-
if (ver != version) {
290-
if (e != 0) {
291-
BackpressureHelper.produced(requested, e);
264+
boolean d = done;
265+
266+
updateInners();
267+
long ver = versionCache;
268+
269+
270+
if (d) {
271+
Throwable ex = err.get();
272+
if (ex != null) {
273+
clearCache();
274+
275+
a.onError(err.terminate());
276+
return;
277+
} else
278+
if (inners[0] == null) {
279+
a.onComplete();
280+
return;
292281
}
293-
continue outer;
294282
}
295283

296-
long f = 0;
297-
298-
SimplePlainQueue<R> q = inner.queue;
284+
int becameEmpty = 0;
285+
int activeCount = 0;
299286

300-
while (e != r) {
287+
draining:
288+
for (SfmInnerSubscriber<T, R> inner : inners) {
301289
if (cancelled) {
302290
clearCache();
303291
return;
304292
}
305293

294+
if (inner == null) {
295+
break;
296+
}
306297
if (ver != version) {
307298
if (e != 0) {
308299
BackpressureHelper.produced(requested, e);
309300
}
310-
if (f != 0L) {
311-
inner.produced(f);
312-
}
313301
continue outer;
314302
}
315303

316-
boolean d2 = inner.done;
317-
R v = q.poll();
318-
boolean empty = v == null;
304+
activeCount++;
319305

320-
if (d2 && empty) {
321-
remove(inner);
322-
continue draining;
323-
}
306+
long f = 0;
324307

325-
if (empty) {
326-
if (f != 0L) {
327-
inner.produced(f);
328-
f = 0L;
308+
SimplePlainQueue<R> q = inner.queue;
309+
310+
while (e != r) {
311+
if (cancelled) {
312+
clearCache();
313+
return;
329314
}
330-
break;
315+
316+
Throwable ex = err.get();
317+
if (ex != null) {
318+
clearCache();
319+
320+
a.onError(err.terminate());
321+
return;
322+
}
323+
324+
if (ver != version) {
325+
if (e != 0) {
326+
BackpressureHelper.produced(requested, e);
327+
}
328+
if (f != 0L) {
329+
inner.produced(f);
330+
}
331+
continue outer;
332+
}
333+
334+
boolean d2 = inner.done;
335+
R v = q.poll();
336+
boolean empty = v == null;
337+
338+
if (d2 && empty) {
339+
remove(inner);
340+
continue draining;
341+
}
342+
343+
if (empty) {
344+
if (f != 0L) {
345+
inner.produced(f);
346+
f = 0L;
347+
}
348+
becameEmpty++;
349+
break;
350+
}
351+
352+
a.onNext(v);
353+
e++;
354+
f++;
331355
}
332356

333-
a.onNext(v);
334-
e++;
335-
f++;
357+
if (inner.done && q.isEmpty()) {
358+
remove(inner);
359+
} else
360+
if (f != 0L) {
361+
inner.produced(f);
362+
}
336363
}
337364

338-
if (inner.done && q.isEmpty()) {
339-
remove(inner);
340-
} else
341-
if (f != 0L) {
342-
inner.produced(f);
365+
if (becameEmpty == activeCount || e == r) {
366+
break;
343367
}
344368
}
345369

@@ -376,17 +400,13 @@ static final class SfmInnerSubscriber<T, R> extends AtomicReference<Subscription
376400
this.parent = parent;
377401
this.bufferSize = bufferSize;
378402
this.limit = bufferSize - (bufferSize >> 2);
379-
this.queue = new SpscLinkedArrayQueue<R>(bufferSize);
403+
this.queue = new SpscArrayQueue<R>(bufferSize);
380404
}
381405

382406
void cancel() {
383407
SubscriptionHelper.cancel(this);
384408
}
385409

386-
boolean isCancelled() {
387-
return SubscriptionHelper.isCancelled(get());
388-
}
389-
390410
@Override
391411
public void onSubscribe(Subscription s) {
392412
if (SubscriptionHelper.setOnce(this, s)) {

0 commit comments

Comments
 (0)