Skip to content

Commit 402a966

Browse files
committed
Remove type argument from CompletableSubject
1 parent 08b5589 commit 402a966

File tree

2 files changed

+29
-37
lines changed

2 files changed

+29
-37
lines changed

src/main/java/hu/akarnokd/rxjava2/subjects/CompletableSubject.java

+22-30
Original file line numberDiff line numberDiff line change
@@ -31,34 +31,29 @@
3131
* <p>
3232
* The CompletableSubject doesn't store the Disposables coming through onSubscribe but
3333
* disposes them once the other onXXX methods were called (terminal state reached).
34-
* @param <T> the value type received and emitted
3534
*/
36-
public final class CompletableSubject<T> extends Completable implements CompletableObserver {
35+
public final class CompletableSubject extends Completable implements CompletableObserver {
3736

38-
final AtomicReference<MaybeDisposable<T>[]> observers;
37+
final AtomicReference<CompletableDisposable[]> observers;
3938

40-
@SuppressWarnings("rawtypes")
41-
static final MaybeDisposable[] EMPTY = new MaybeDisposable[0];
39+
static final CompletableDisposable[] EMPTY = new CompletableDisposable[0];
4240

43-
@SuppressWarnings("rawtypes")
44-
static final MaybeDisposable[] TERMINATED = new MaybeDisposable[0];
41+
static final CompletableDisposable[] TERMINATED = new CompletableDisposable[0];
4542

4643
final AtomicBoolean once;
4744
Throwable error;
4845

4946
/**
5047
* Creates a fresh CompletableSubject.
51-
* @param <T> the value type received and emitted
5248
* @return the new CompletableSubject instance
5349
*/
54-
public static <T> CompletableSubject<T> create() {
55-
return new CompletableSubject<T>();
50+
public static CompletableSubject create() {
51+
return new CompletableSubject();
5652
}
5753

58-
@SuppressWarnings("unchecked")
5954
CompletableSubject() {
6055
once = new AtomicBoolean();
61-
observers = new AtomicReference<MaybeDisposable<T>[]>(EMPTY);
56+
observers = new AtomicReference<CompletableDisposable[]>(EMPTY);
6257
}
6358

6459
@Override
@@ -68,35 +63,33 @@ public void onSubscribe(Disposable d) {
6863
}
6964
}
7065

71-
@SuppressWarnings("unchecked")
7266
@Override
7367
public void onError(Throwable e) {
7468
if (e == null) {
7569
e = new NullPointerException("Null errors are not allowed in 2.x");
7670
}
7771
if (once.compareAndSet(false, true)) {
7872
this.error = e;
79-
for (MaybeDisposable<T> md : observers.getAndSet(TERMINATED)) {
73+
for (CompletableDisposable md : observers.getAndSet(TERMINATED)) {
8074
md.actual.onError(e);
8175
}
8276
} else {
8377
RxJavaPlugins.onError(e);
8478
}
8579
}
8680

87-
@SuppressWarnings("unchecked")
8881
@Override
8982
public void onComplete() {
9083
if (once.compareAndSet(false, true)) {
91-
for (MaybeDisposable<T> md : observers.getAndSet(TERMINATED)) {
84+
for (CompletableDisposable md : observers.getAndSet(TERMINATED)) {
9285
md.actual.onComplete();
9386
}
9487
}
9588
}
9689

9790
@Override
9891
protected void subscribeActual(CompletableObserver observer) {
99-
MaybeDisposable<T> md = new MaybeDisposable<T>(observer, this);
92+
CompletableDisposable md = new CompletableDisposable(observer, this);
10093
observer.onSubscribe(md);
10194
if (add(md)) {
10295
if (md.isDisposed()) {
@@ -112,16 +105,16 @@ protected void subscribeActual(CompletableObserver observer) {
112105
}
113106
}
114107

115-
boolean add(MaybeDisposable<T> inner) {
108+
boolean add(CompletableDisposable inner) {
116109
for (;;) {
117-
MaybeDisposable<T>[] a = observers.get();
110+
CompletableDisposable[] a = observers.get();
118111
if (a == TERMINATED) {
119112
return false;
120113
}
121114

122115
int n = a.length;
123-
@SuppressWarnings("unchecked")
124-
MaybeDisposable<T>[] b = new MaybeDisposable[n + 1];
116+
117+
CompletableDisposable[] b = new CompletableDisposable[n + 1];
125118
System.arraycopy(a, 0, b, 0, n);
126119
b[n] = inner;
127120
if (observers.compareAndSet(a, b)) {
@@ -130,10 +123,9 @@ boolean add(MaybeDisposable<T> inner) {
130123
}
131124
}
132125

133-
@SuppressWarnings("unchecked")
134-
void remove(MaybeDisposable<T> inner) {
126+
void remove(CompletableDisposable inner) {
135127
for (;;) {
136-
MaybeDisposable<T>[] a = observers.get();
128+
CompletableDisposable[] a = observers.get();
137129
int n = a.length;
138130
if (n == 0) {
139131
return;
@@ -151,11 +143,11 @@ void remove(MaybeDisposable<T> inner) {
151143
if (j < 0) {
152144
return;
153145
}
154-
MaybeDisposable<T>[] b;
146+
CompletableDisposable[] b;
155147
if (n == 1) {
156148
b = EMPTY;
157149
} else {
158-
b = new MaybeDisposable[n - 1];
150+
b = new CompletableDisposable[n - 1];
159151
System.arraycopy(a, 0, b, 0, j);
160152
System.arraycopy(a, j + 1, b, j, n - j - 1);
161153
}
@@ -209,20 +201,20 @@ public boolean hasObservers() {
209201
return observers.get().length;
210202
}
211203

212-
static final class MaybeDisposable<T>
213-
extends AtomicReference<CompletableSubject<T>> implements Disposable {
204+
static final class CompletableDisposable
205+
extends AtomicReference<CompletableSubject> implements Disposable {
214206
private static final long serialVersionUID = -7650903191002190468L;
215207

216208
final CompletableObserver actual;
217209

218-
MaybeDisposable(CompletableObserver actual, CompletableSubject<T> parent) {
210+
CompletableDisposable(CompletableObserver actual, CompletableSubject parent) {
219211
this.actual = actual;
220212
lazySet(parent);
221213
}
222214

223215
@Override
224216
public void dispose() {
225-
CompletableSubject<T> parent = getAndSet(null);
217+
CompletableSubject parent = getAndSet(null);
226218
if (parent != null) {
227219
parent.remove(this);
228220
}

src/test/java/hu/akarnokd/rxjava2/subjects/CompletableSubjectTest.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class CompletableSubjectTest {
3434

3535
@Test
3636
public void once() {
37-
CompletableSubject<Integer> ms = CompletableSubject.create();
37+
CompletableSubject ms = CompletableSubject.create();
3838

3939
TestObserver<Void> to = ms.test();
4040

@@ -55,7 +55,7 @@ public void once() {
5555

5656
@Test
5757
public void error() {
58-
CompletableSubject<Integer> ms = CompletableSubject.create();
58+
CompletableSubject ms = CompletableSubject.create();
5959

6060
assertFalse(ms.hasComplete());
6161
assertFalse(ms.hasThrowable());
@@ -91,7 +91,7 @@ public void error() {
9191

9292
@Test
9393
public void complete() {
94-
CompletableSubject<Integer> ms = CompletableSubject.create();
94+
CompletableSubject ms = CompletableSubject.create();
9595

9696
assertFalse(ms.hasComplete());
9797
assertFalse(ms.hasThrowable());
@@ -127,7 +127,7 @@ public void complete() {
127127

128128
@Test
129129
public void nullThrowable() {
130-
CompletableSubject<Integer> ms = CompletableSubject.create();
130+
CompletableSubject ms = CompletableSubject.create();
131131

132132
TestObserver<Void> to = ms.test();
133133

@@ -145,7 +145,7 @@ public void cancelOnArrival() {
145145

146146
@Test
147147
public void cancelOnArrival2() {
148-
CompletableSubject<Integer> ms = CompletableSubject.create();
148+
CompletableSubject ms = CompletableSubject.create();
149149

150150
ms.test();
151151

@@ -187,7 +187,7 @@ public void onComplete() {
187187

188188
@Test
189189
public void onSubscribeDispose() {
190-
CompletableSubject<Integer> ms = CompletableSubject.create();
190+
CompletableSubject ms = CompletableSubject.create();
191191

192192
Disposable d = Disposables.empty();
193193

@@ -207,7 +207,7 @@ public void onSubscribeDispose() {
207207
@Test
208208
public void addRemoveRace() {
209209
for (int i = 0; i < 500; i++) {
210-
final CompletableSubject<Integer> ms = CompletableSubject.create();
210+
final CompletableSubject ms = CompletableSubject.create();
211211

212212
final TestObserver<Void> to = ms.test();
213213

0 commit comments

Comments
 (0)