Skip to content

Commit e313521

Browse files
committed
RefCount with subscriberCount and disconnect timeout
1 parent b8cb650 commit e313521

File tree

5 files changed

+599
-3
lines changed

5 files changed

+599
-3
lines changed

Diff for: README.md

+54-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ RxJava 2.x implementation of extra sources, operators and components and ports o
1313

1414
```
1515
dependencies {
16-
compile "com.github.akarnokd:rxjava2-extensions:0.16.5"
16+
compile "com.github.akarnokd:rxjava2-extensions:0.17.0"
1717
}
1818
```
1919

@@ -46,7 +46,8 @@ Maven search:
4646
- [cacheLast()](#flowabletransformerscachelast), [timeoutLast()](#flowabletransformerstimeoutlast--timeoutlastabsolute), [timeoutLastAbsolute()](#flowabletransformerstimeoutlast--timeoutlastabsolute),
4747
- [debounceFirst()](#flowabletransformersdebouncefirst), [switchFlatMap()](#flowabletransformersswitchflatmap), [flatMapSync()](#flowabletransformersflatmapsync),
4848
- [flatMapAsync()](#flowabletransformersflatmapasync), [switchIfEmpty()](#flowabletransformersswitchifempty--switchifemptyarray),
49-
- [expand()](#flowabletransformersexpand), [mapAsync()](#flowabletransformersmapasync), [filterAsync()](#flowabletransformerfilterasync)
49+
- [expand()](#flowabletransformersexpand), [mapAsync()](#flowabletransformersmapasync), [filterAsync()](#flowabletransformerfilterasync),
50+
- [refCount()](#flowabletransformersrefcount)
5051
- [Custom parallel operators and transformers](#custom-parallel-operators-and-transformers)
5152
- [sumX()](#paralleltransformerssumx)
5253
- [Special Publisher implementations](#special-publisher-implementations)
@@ -1076,6 +1077,57 @@ Flowable.range(1, 10)
10761077
.assertResult(2, 4, 6, 8, 10);
10771078
```
10781079
1080+
### FlowableTransformers.refCount()
1081+
1082+
Offers the option to connect after a certain amount of subscribers have subscribed and/or specify a timeout
1083+
for disconnecting the upstream if all subscribers have unsubscribed. This allows keeping the connection alive if
1084+
there is a small window where new subscribers may subscribe after the previous set has unsubscribed. Note
1085+
that if the upstream to the transformer is not a `ConnectableFlowable`, the call to the transformer method
1086+
will throw an `IllegalArgumentException`.
1087+
1088+
This example will connect only after there is a second subscriber:
1089+
1090+
```java
1091+
Flowable<Integer> source = Flowable.range(1, 5)
1092+
.publish()
1093+
.compose(FlowableTransformers.refCount(2))
1094+
;
1095+
1096+
TestSubscriber<Integer> ts1 = source.test();
1097+
1098+
ts1.assertEmpty();
1099+
1100+
TestSubscriber<Integer> ts2 = source.test();
1101+
1102+
ts1.assertResult(1, 2, 3, 4, 5);
1103+
ts2.assertResult(1, 2, 3, 4, 5);
1104+
```
1105+
1106+
This example will disconnect only after a second:
1107+
1108+
```java
1109+
PublishProcessor<Integer> pp = PublishProcessor.create();
1110+
1111+
Flowable<Integer> source = pp
1112+
.publish()
1113+
.compose(FlowableTransformers.refCount(1, TimeUnit.SECONDS, Schedulers.single()));
1114+
1115+
assertFalse(pp.hasSubscribers());
1116+
1117+
TestSubscriber<Integer> ts = source.test();
1118+
1119+
assertTrue(pp.hasSubscribers());
1120+
1121+
ts.cancel();
1122+
1123+
assertTrue(pp.hasSubscribers());
1124+
1125+
Thread.sleep(1200);
1126+
1127+
assertFalse(pp.hasSubscribers());
1128+
```
1129+
1130+
10791131
## Custom parallel operators and transformers
10801132
10811133
### ParallelTransformers.sumX()

Diff for: gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.16.5
1+
version=0.17.0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
/*
2+
* Copyright 2016-2017 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.rxjava2.operators;
18+
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.concurrent.atomic.*;
21+
22+
import org.reactivestreams.*;
23+
24+
import io.reactivex.*;
25+
import io.reactivex.disposables.Disposable;
26+
import io.reactivex.flowables.ConnectableFlowable;
27+
import io.reactivex.functions.Consumer;
28+
import io.reactivex.internal.disposables.*;
29+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
30+
31+
/**
32+
* A refCount implementation that allows connecting to the source after the specified
33+
* number of Subscribers subscribed and allows disconnecting after a specified
34+
* grace period
35+
* @since 0.17.0
36+
*/
37+
final class FlowableRefCountTimeout<T> extends Flowable<T> implements FlowableTransformer<T, T> {
38+
39+
final ConnectableFlowable<T> source;
40+
41+
final int n;
42+
43+
final long timeout;
44+
45+
final TimeUnit unit;
46+
47+
final Scheduler scheduler;
48+
49+
RefConnection connection;
50+
51+
FlowableRefCountTimeout(ConnectableFlowable<T> source, int n, long timeout, TimeUnit unit,
52+
Scheduler scheduler) {
53+
this.source = source;
54+
this.n = n;
55+
this.timeout = timeout;
56+
this.unit = unit;
57+
this.scheduler = scheduler;
58+
}
59+
60+
@Override
61+
public Publisher<T> apply(Flowable<T> upstream) {
62+
if (upstream instanceof ConnectableFlowable) {
63+
return new FlowableRefCountTimeout<T>((ConnectableFlowable<T>)upstream, n, timeout, unit, scheduler);
64+
}
65+
throw new IllegalArgumentException("This transformer requires an upstream ConnectableFlowable");
66+
}
67+
68+
@Override
69+
protected void subscribeActual(Subscriber<? super T> s) {
70+
71+
for (;;) {
72+
RefConnection conn;
73+
74+
boolean connect = false;
75+
synchronized (this) {
76+
conn = connection;
77+
if (conn == null || conn.terminated) {
78+
conn = new RefConnection(this);
79+
connection = conn;
80+
}
81+
82+
long c = conn.subscriberCount;
83+
if (c == 0L && conn.timer != null) {
84+
conn.timer.dispose();
85+
}
86+
conn.subscriberCount = c + 1;
87+
if (!conn.connected && c + 1 == n) {
88+
connect = true;
89+
conn.connected = true;
90+
}
91+
}
92+
93+
source.subscribe(new RefCountSubscriber<T>(s, this, conn));
94+
95+
if (connect) {
96+
source.connect(conn);
97+
}
98+
99+
break;
100+
}
101+
}
102+
103+
void cancel(RefConnection rc) {
104+
SequentialDisposable sd;
105+
synchronized (this) {
106+
if (rc.terminated) {
107+
return;
108+
}
109+
long c = rc.subscriberCount - 1;
110+
rc.subscriberCount = c;
111+
if (c != 0L || !rc.connected) {
112+
return;
113+
}
114+
if (timeout == 0L) {
115+
timeout(rc);
116+
return;
117+
}
118+
sd = new SequentialDisposable();
119+
rc.timer = sd;
120+
}
121+
122+
sd.replace(scheduler.scheduleDirect(rc, timeout, unit));
123+
}
124+
125+
void terminated(RefConnection rc) {
126+
synchronized (this) {
127+
if (!rc.terminated) {
128+
rc.terminated = true;
129+
connection = null;
130+
}
131+
}
132+
}
133+
134+
void timeout(RefConnection rc) {
135+
synchronized (this) {
136+
if (rc.subscriberCount == 0 && rc == connection) {
137+
DisposableHelper.dispose(rc);
138+
if (source instanceof Disposable) {
139+
((Disposable)source).dispose();
140+
}
141+
connection = null;
142+
}
143+
}
144+
}
145+
146+
static final class RefConnection extends AtomicReference<Disposable>
147+
implements Runnable, Consumer<Disposable> {
148+
149+
private static final long serialVersionUID = -4552101107598366241L;
150+
151+
final FlowableRefCountTimeout<?> parent;
152+
153+
Disposable timer;
154+
155+
long subscriberCount;
156+
157+
boolean connected;
158+
159+
boolean terminated;
160+
161+
RefConnection(FlowableRefCountTimeout<?> parent) {
162+
this.parent = parent;
163+
}
164+
165+
@Override
166+
public void run() {
167+
parent.timeout(this);
168+
}
169+
170+
@Override
171+
public void accept(Disposable t) throws Exception {
172+
DisposableHelper.replace(this, t);
173+
}
174+
}
175+
176+
static final class RefCountSubscriber<T>
177+
extends AtomicBoolean implements FlowableSubscriber<T>, Subscription {
178+
179+
private static final long serialVersionUID = -7419642935409022375L;
180+
181+
final Subscriber<? super T> actual;
182+
183+
final FlowableRefCountTimeout<T> parent;
184+
185+
final RefConnection connection;
186+
187+
Subscription upstream;
188+
189+
RefCountSubscriber(Subscriber<? super T> actual, FlowableRefCountTimeout<T> parent, RefConnection connection) {
190+
this.actual = actual;
191+
this.parent = parent;
192+
this.connection = connection;
193+
}
194+
195+
@Override
196+
public void onNext(T t) {
197+
actual.onNext(t);
198+
}
199+
200+
@Override
201+
public void onError(Throwable t) {
202+
actual.onError(t);
203+
if (compareAndSet(false, true)) {
204+
parent.terminated(connection);
205+
}
206+
}
207+
208+
@Override
209+
public void onComplete() {
210+
actual.onComplete();
211+
if (compareAndSet(false, true)) {
212+
parent.terminated(connection);
213+
}
214+
}
215+
216+
@Override
217+
public void request(long n) {
218+
upstream.request(n);
219+
}
220+
221+
@Override
222+
public void cancel() {
223+
upstream.cancel();
224+
if (compareAndSet(false, true)) {
225+
parent.cancel(connection);
226+
}
227+
}
228+
229+
@Override
230+
public void onSubscribe(Subscription s) {
231+
if (SubscriptionHelper.validate(upstream, s)) {
232+
this.upstream = s;
233+
234+
actual.onSubscribe(this);
235+
}
236+
}
237+
}
238+
}

0 commit comments

Comments
 (0)