Skip to content

Commit dac1fa0

Browse files
committed
fix the usage of deprecated methods.
1 parent a4e8dc9 commit dac1fa0

File tree

2 files changed

+38
-59
lines changed

2 files changed

+38
-59
lines changed

src/main/java/rx/observables/StringObservable.java

+14-35
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import rx.Observable.OnSubscribe;
2020
import rx.Observable.Operator;
2121
import rx.Subscriber;
22-
import rx.Subscription;
22+
import rx.functions.Action1;
2323
import rx.functions.Func0;
2424
import rx.functions.Func1;
2525
import rx.functions.Func2;
@@ -38,7 +38,6 @@
3838
import java.nio.charset.CodingErrorAction;
3939
import java.util.Arrays;
4040
import java.util.concurrent.Callable;
41-
import java.util.concurrent.atomic.AtomicBoolean;
4241
import java.util.regex.Pattern;
4342

4443
public class StringObservable {
@@ -56,34 +55,9 @@ public static Observable<byte[]> from(final InputStream i) {
5655
return from(i, 8 * 1024);
5756
}
5857

59-
private static class CloseableResource<S extends Closeable> implements Subscription {
60-
private final AtomicBoolean unsubscribed = new AtomicBoolean();
61-
private S closable;
62-
63-
public CloseableResource(S closeable) {
64-
this.closable = closeable;
65-
}
66-
67-
@Override
68-
public void unsubscribe() {
69-
if (unsubscribed.compareAndSet(false, true)) {
70-
try {
71-
closable.close();
72-
} catch (Exception e) {
73-
throw new RuntimeException(e);
74-
}
75-
}
76-
}
77-
78-
@Override
79-
public boolean isUnsubscribed() {
80-
return unsubscribed.get();
81-
}
82-
}
83-
8458
/**
8559
* Func0 that allows throwing an {@link IOException}s commonly thrown during IO operations.
86-
* @see StringObservable#from(UnsafeFunc0, UnsafeFunc1)
60+
* @see StringObservable#using(UnsafeFunc0, Func1)
8761
*
8862
* @param <R>
8963
*/
@@ -103,22 +77,27 @@ public static interface UnsafeFunc0<R> extends Callable<R> {
10377
* @param observableFactory
10478
* Converts the {@link Closeable} resource into a {@link Observable} with {@link #from(InputStream)} or {@link #from(Reader)}
10579
* @return
80+
* An {@link Observable} that automatically closes the resource when done.
10681
*/
10782
public static <R, S extends Closeable> Observable<R> using(final UnsafeFunc0<S> resourceFactory,
10883
final Func1<S, Observable<R>> observableFactory) {
109-
return Observable.using(new Func0<CloseableResource<S>>() {
84+
return Observable.using(new Func0<S>() {
11085
@Override
111-
public CloseableResource<S> call() {
86+
public S call() {
11287
try {
113-
return new CloseableResource<S>(resourceFactory.call());
88+
return resourceFactory.call();
11489
} catch (Throwable e) {
11590
throw new RuntimeException(e);
11691
}
11792
}
118-
}, new Func1<CloseableResource<S>, Observable<R>>() {
93+
}, observableFactory, new Action1<S>() {
11994
@Override
120-
public Observable<R> call(CloseableResource<S> t1) {
121-
return observableFactory.call(t1.closable);
95+
public void call(S resource) {
96+
try {
97+
resource.close();
98+
} catch (IOException e) {
99+
throw new RuntimeException(e);
100+
}
122101
}
123102
});
124103
}
@@ -403,7 +382,7 @@ public StringBuilder call(StringBuilder a, String b) {
403382
/**
404383
* Maps {@link Observable}&lt;{@link Object}&gt; to {@link Observable}&lt;{@link String}&gt; by using {@link String#valueOf(Object)}
405384
* @param src
406-
* @return
385+
* @return An {@link Observable} of only {@link String}s.
407386
*/
408387
public static Observable<String> toString(Observable<?> src) {
409388
return src.map(new Func1<Object, String>() {

src/test/java/rx/observables/StringObservableTest.java

+24-24
Original file line numberDiff line numberDiff line change
@@ -62,51 +62,51 @@ public class StringObservableTest {
6262

6363
@Test
6464
public void testMultibyteSpanningTwoBuffers() {
65-
Observable<byte[]> src = Observable.from(new byte[] { (byte) 0xc2 }, new byte[] { (byte) 0xa1 });
66-
String out = StringObservable.decode(src, "UTF-8").toBlockingObservable().single();
65+
Observable<byte[]> src = Observable.just(new byte[] { (byte) 0xc2 }, new byte[] { (byte) 0xa1 });
66+
String out = StringObservable.decode(src, "UTF-8").toBlocking().single();
6767

6868
assertEquals("\u00A1", out);
6969
}
7070

7171
@Test
7272
public void testMalformedAtTheEndReplace() {
73-
Observable<byte[]> src = Observable.from(new byte[] { (byte) 0xc2 });
74-
String out = decode(src, "UTF-8").toBlockingObservable().single();
73+
Observable<byte[]> src = Observable.just(new byte[] { (byte) 0xc2 });
74+
String out = decode(src, "UTF-8").toBlocking().single();
7575

7676
// REPLACEMENT CHARACTER
7777
assertEquals("\uFFFD", out);
7878
}
7979

8080
@Test
8181
public void testMalformedInTheMiddleReplace() {
82-
Observable<byte[]> src = Observable.from(new byte[] { (byte) 0xc2, 65 });
83-
String out = decode(src, "UTF-8").toBlockingObservable().single();
82+
Observable<byte[]> src = Observable.just(new byte[] { (byte) 0xc2, 65 });
83+
String out = decode(src, "UTF-8").toBlocking().single();
8484

8585
// REPLACEMENT CHARACTER
8686
assertEquals("\uFFFDA", out);
8787
}
8888

8989
@Test(expected = RuntimeException.class)
9090
public void testMalformedAtTheEndReport() {
91-
Observable<byte[]> src = Observable.from(new byte[] { (byte) 0xc2 });
91+
Observable<byte[]> src = Observable.just(new byte[] { (byte) 0xc2 });
9292
CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder();
93-
decode(src, charsetDecoder).toBlockingObservable().single();
93+
decode(src, charsetDecoder).toBlocking().single();
9494
}
9595

9696
@Test(expected = RuntimeException.class)
9797
public void testMalformedInTheMiddleReport() {
98-
Observable<byte[]> src = Observable.from(new byte[] { (byte) 0xc2, 65 });
98+
Observable<byte[]> src = Observable.just(new byte[] { (byte) 0xc2, 65 });
9999
CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder();
100-
decode(src, charsetDecoder).toBlockingObservable().single();
100+
decode(src, charsetDecoder).toBlocking().single();
101101
}
102102

103103
@Test
104104
public void testPropogateError() {
105-
Observable<byte[]> src = Observable.from(new byte[] { 65 });
105+
Observable<byte[]> src = Observable.just(new byte[] { 65 });
106106
Observable<byte[]> err = Observable.error(new IOException());
107107
CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder();
108108
try {
109-
decode(Observable.concat(src, err), charsetDecoder).toList().toBlockingObservable().single();
109+
decode(Observable.concat(src, err), charsetDecoder).toList().toBlocking().single();
110110
fail();
111111
} catch (RuntimeException e) {
112112
assertEquals(IOException.class, e.getCause().getClass());
@@ -115,11 +115,11 @@ public void testPropogateError() {
115115

116116
@Test
117117
public void testPropogateErrorInTheMiddleOfMultibyte() {
118-
Observable<byte[]> src = Observable.from(new byte[] { (byte) 0xc2 });
118+
Observable<byte[]> src = Observable.just(new byte[] { (byte) 0xc2 });
119119
Observable<byte[]> err = Observable.error(new IOException());
120120
CharsetDecoder charsetDecoder = Charset.forName("UTF-8").newDecoder();
121121
try {
122-
decode(Observable.concat(src, err), charsetDecoder).toList().toBlockingObservable().single();
122+
decode(Observable.concat(src, err), charsetDecoder).toList().toBlocking().single();
123123
fail();
124124
} catch (RuntimeException e) {
125125
assertEquals(MalformedInputException.class, e.getCause().getClass());
@@ -130,7 +130,7 @@ public void testPropogateErrorInTheMiddleOfMultibyte() {
130130
public void testEncode() {
131131
assertArrayEquals(
132132
new byte[] { (byte) 0xc2, (byte) 0xa1 }, encode(Observable.just("\u00A1"), "UTF-8")
133-
.toBlockingObservable().single());
133+
.toBlocking().single());
134134
}
135135

136136
@Test
@@ -144,11 +144,11 @@ public void testSplitOnOh() {
144144
}
145145

146146
public void testSplit(String str, String regex, int limit, String... parts) {
147-
testSplit(str, regex, 0, Observable.from(str), parts);
147+
testSplit(str, regex, 0, Observable.just(str), parts);
148148
for (int i = 0; i < str.length(); i++) {
149149
String a = str.substring(0, i);
150150
String b = str.substring(i, str.length());
151-
testSplit(a + "|" + b, regex, limit, Observable.from(a, b), parts);
151+
testSplit(a + "|" + b, regex, limit, Observable.just(a, b), parts);
152152
}
153153
}
154154

@@ -176,7 +176,7 @@ public void testJoinMixed() {
176176

177177
@Test
178178
public void testJoinWithEmptyString() {
179-
Observable<String> source = Observable.from("", "b", "c");
179+
Observable<String> source = Observable.just("", "b", "c");
180180

181181
Observable<String> result = join(source, ", ");
182182

@@ -192,7 +192,7 @@ public void testJoinWithEmptyString() {
192192

193193
@Test
194194
public void testJoinWithNull() {
195-
Observable<String> source = Observable.from("a", null, "c");
195+
Observable<String> source = Observable.just("a", null, "c");
196196

197197
Observable<String> result = join(source, ", ");
198198

@@ -208,7 +208,7 @@ public void testJoinWithNull() {
208208

209209
@Test
210210
public void testJoinSingle() {
211-
Observable<String> source = Observable.from("a");
211+
Observable<String> source = Observable.just("a");
212212

213213
Observable<String> result = join(source, ", ");
214214

@@ -258,7 +258,7 @@ public void testJoinThrows() {
258258
@Test
259259
public void testFromInputStream() {
260260
final byte[] inBytes = "test".getBytes();
261-
final byte[] outBytes = from(new ByteArrayInputStream(inBytes)).toBlockingObservable().single();
261+
final byte[] outBytes = from(new ByteArrayInputStream(inBytes)).toBlocking().single();
262262
assertNotSame(inBytes, outBytes);
263263
assertArrayEquals(inBytes, outBytes);
264264
}
@@ -275,14 +275,14 @@ public synchronized int read(byte[] b, int off, int len) {
275275
return super.read(b, off, len);
276276
}
277277
};
278-
StringObservable.from(is).first().toBlockingObservable().single();
278+
StringObservable.from(is).first().toBlocking().single();
279279
assertEquals(1, numReads.get());
280280
}
281281

282282
@Test
283283
public void testFromReader() {
284284
final String inStr = "test";
285-
final String outStr = from(new StringReader(inStr)).toBlockingObservable().single();
285+
final String outStr = from(new StringReader(inStr)).toBlocking().single();
286286
assertNotSame(inStr, outStr);
287287
assertEquals(inStr, outStr);
288288
}
@@ -292,7 +292,7 @@ public void testByLine() {
292292
String newLine = System.getProperty("line.separator");
293293

294294
List<Line> lines = byLine(Observable.from(Arrays.asList("qwer", newLine + "asdf" + newLine, "zx", "cv")))
295-
.toList().toBlockingObservable().single();
295+
.toList().toBlocking().single();
296296

297297
assertEquals(Arrays.asList(new Line(0, "qwer"), new Line(1, "asdf"), new Line(2, "zxcv")), lines);
298298
}

0 commit comments

Comments
 (0)