Skip to content

Commit 885831b

Browse files
committed
+ every, +repeat, repeatCallable
1 parent 70d9754 commit 885831b

File tree

9 files changed

+884
-1
lines changed

9 files changed

+884
-1
lines changed

README.md

+36
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,42 @@ Flowable.intervalRange(1, 5, 100, 100, TimeUnit.MILLISECONDS)
688688
.assertResult();
689689
```
690690

691+
### Flowables.repeat
692+
693+
Repeats a scalar value indefinitely (until the downstream actually cancels), honoring backpressure and supporting synchronous fusion and/or conditional fusion.
694+
695+
```java
696+
Flowable.repeat("doesn't matter")
697+
.map(v -> ThreadLocalRandom.current().nextDouble())
698+
.take(100)
699+
.all(v -> v < 1d)
700+
.test()
701+
.assertResult(true);
702+
```
703+
704+
### Flowables.repeatCallable
705+
706+
Repeatedly calls a callable, indefinitely (until the downstream actually cancels) or if the callable throws or returns null (when it signals `NullPointerException`), honoring backpressure and supporting synchronous fusion and/or conditional fusion.
707+
708+
```java
709+
Flowable.repeatCallable(() -> ThreadLocalRandom.current().nextDouble())
710+
.take(100)
711+
.all(v -> v < 1d)
712+
.test()
713+
.assertResult(true);
714+
```
715+
716+
### FlowableTransformers.every
717+
718+
Relays every Nth item from upstream (skipping the in-between items).
719+
720+
```java
721+
Flowable.range(1, 5)
722+
.compose(FlowableTransformers.<Integer>every(2))
723+
.test()
724+
.assertResult(2, 4)
725+
```
726+
691727
## Special Publisher implementations
692728

693729
### Nono - 0-error publisher

gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.14.1
1+
version=0.14.2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright 2016 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.rxjava2.operators;
18+
19+
import org.reactivestreams.*;
20+
21+
import io.reactivex.*;
22+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
23+
import io.reactivex.internal.util.BackpressureHelper;
24+
25+
/**
26+
* Emits only every Nth item.
27+
*
28+
* @param <T> the value type
29+
*
30+
* @since 0.14.2
31+
*/
32+
final class FlowableEvery<T> extends Flowable<T> implements FlowableTransformer<T, T> {
33+
34+
final Publisher<T> source;
35+
36+
final long keep;
37+
38+
FlowableEvery(Publisher<T> source, long keep) {
39+
this.source = source;
40+
this.keep = keep;
41+
}
42+
43+
@Override
44+
protected void subscribeActual(Subscriber<? super T> s) {
45+
source.subscribe(new EverySubscriber<T>(s, keep));
46+
}
47+
48+
@Override
49+
public Publisher<T> apply(Flowable<T> upstream) {
50+
return new FlowableEvery<T>(upstream, keep);
51+
}
52+
53+
static final class EverySubscriber<T> implements Subscriber<T>, Subscription {
54+
55+
final Subscriber<? super T> actual;
56+
57+
final long keep;
58+
59+
long index;
60+
61+
Subscription s;
62+
63+
public EverySubscriber(Subscriber<? super T> actual, long keep) {
64+
this.actual = actual;
65+
this.keep = keep;
66+
}
67+
68+
@Override
69+
public void onSubscribe(Subscription s) {
70+
if (SubscriptionHelper.validate(this.s, s)) {
71+
this.s = s;
72+
73+
actual.onSubscribe(this);
74+
}
75+
}
76+
77+
@Override
78+
public void onNext(T t) {
79+
long i = index + 1;
80+
if (i == keep) {
81+
index = 0;
82+
actual.onNext(t);
83+
} else {
84+
index = i;
85+
}
86+
}
87+
88+
@Override
89+
public void onError(Throwable t) {
90+
actual.onError(t);
91+
}
92+
93+
@Override
94+
public void onComplete() {
95+
actual.onComplete();
96+
}
97+
98+
@Override
99+
public void request(long n) {
100+
if (SubscriptionHelper.validate(n)) {
101+
long u = BackpressureHelper.multiplyCap(n, keep);
102+
s.request(u);
103+
}
104+
}
105+
106+
@Override
107+
public void cancel() {
108+
s.cancel();
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)