From 84d53bee3e42be8dc31d4e3ef9f90c5c20f33a7e Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 3 Aug 2018 11:16:40 +0200 Subject: [PATCH] 2.x: Expand Creating-Observables.md wiki --- docs/Creating-Observables.md | 416 ++++++++++++++++++++++++++++++++++- 1 file changed, 404 insertions(+), 12 deletions(-) diff --git a/docs/Creating-Observables.md b/docs/Creating-Observables.md index b8a6b90ca7..b167c4d866 100644 --- a/docs/Creating-Observables.md +++ b/docs/Creating-Observables.md @@ -1,12 +1,404 @@ -This page shows methods that create Observables. - -* [**`just( )`**](http://reactivex.io/documentation/operators/just.html) — convert an object or several objects into an Observable that emits that object or those objects -* [**`from( )`**](http://reactivex.io/documentation/operators/from.html) — convert an Iterable, a Future, or an Array into an Observable -* [**`create( )`**](http://reactivex.io/documentation/operators/create.html) — **advanced use only!** create an Observable from scratch by means of a function -* [**`defer( )`**](http://reactivex.io/documentation/operators/defer.html) — do not create the Observable until a Subscriber subscribes; create a fresh Observable on each subscription -* [**`range( )`**](http://reactivex.io/documentation/operators/range.html) — create an Observable that emits a range of sequential integers -* [**`interval( )`**](http://reactivex.io/documentation/operators/interval.html) — create an Observable that emits a sequence of integers spaced by a given time interval -* [**`timer( )`**](http://reactivex.io/documentation/operators/timer.html) — create an Observable that emits a single item after a given delay -* [**`empty( )`**](http://reactivex.io/documentation/operators/empty-never-throw.html) — create an Observable that emits nothing and then completes -* [**`error( )`**](http://reactivex.io/documentation/operators/empty-never-throw.html) — create an Observable that emits nothing and then signals an error -* [**`never( )`**](http://reactivex.io/documentation/operators/empty-never-throw.html) — create an Observable that emits nothing at all +This page shows methods that create reactive sources, such as `Observable`s. + +### Outline + +- [`create`](#create) +- [`defer`](#defer) +- [`empty`](#empty) +- [`error`](#error) +- [`from`](#from) +- [`interval`](#interval) +- [`just`](#just) +- [`never`](#never) +- [`range`](#range) +- [`timer`](#timer) + +## just + +**Available in:** ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Flowable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Observable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Maybe`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Single`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Completable` + +**ReactiveX doumentation:** [http://reactivex.io/documentation/operators/just.html](http://reactivex.io/documentation/operators/just.html) + +Constructs a reactive type by taking a pre-existing object and emitting that specific object to the downstream consumer upon subscription. + +#### just example: + +```java +String greeting = "Hello world!"; + +Observable observable = Observable.just(greeting); + +observable.subscribe(item -> System.out.println(item)); +``` + +There exist overloads with 2 to 9 arguments for convenience, which objects (with the same common type) will be emitted in the order they are specified. + +```java +Observable observable = Observable.just("1", "A", "3.2", "def"); + +observable.subscribe(item -> System.out.print(item), error -> error.printStackTrace, + () -> System.out.println()); +``` + +## From + +Constructs a sequence from a pre-existing source or generator type. + +*Note: These static methods use the postfix naming convention (i.e., the argument type is repeated in the method name) to avoid overload resolution ambiguities.* + +**ReactiveX doumentation:** [http://reactivex.io/documentation/operators/from.html](http://reactivex.io/documentation/operators/from.html) + +### fromIterable + +**Available in:** ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Flowable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Observable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Maybe`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Single`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Completable` + +Signals the items from a `java.lang.Iterable` source (such as `List`s, `Set`s or `Collection`s or custom `Iterable`s) and then completes the sequence. + +#### fromIterable example: + +```java +List list = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8)); + +Observable observable = Observable.fromIterable(list); + +observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(), + () -> System.out.println("Done")); +``` + +### fromArray + +**Available in:** ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Flowable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Observable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Maybe`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Single`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Completable` + +Signals the elements of the given array and then completes the sequence. + +#### fromArray example: + +```java +Integer[] array = new Integer[10]; +for (int i = 0; i < array.length; i++) { + array[i] = i; +} + +Observable observable = Observable.fromIterable(array); + +observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(), + () -> System.out.println("Done")); +``` + +*Note: RxJava does not support primitive arrays, only (generic) reference arrays.* + +### fromCallable + +**Available in:** ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Flowable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Observable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Maybe`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Single`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Completable` + +When a consumer subscribes, the given `java.util.concurrent.Callable` is invoked and its returned value (or thrown exception) is relayed to that consumer. + +#### fromCallable example: + +```java +Callable callable = () -> { + System.out.println("Hello World!"); + return "Hello World!"); +} + +Observable observable = Observable.fromCallable(callable); + +observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(), + () -> System.out.println("Done")); +``` + +*Remark: In `Completable`, the actual returned value is ignored and the `Completable` simply completes.* + +## fromAction + +**Available in:** ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Flowable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Observable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Maybe`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Single`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Completable` + +When a consumer subscribes, the given `io.reactivex.function.Action` is invoked and the consumer completes or receives the exception the `Action` threw. + +#### fromAction example: + +```java +Action action = () -> System.out.println("Hello World!"); + +Completable completable = Completable.fromAction(action); + +completable.subscribe(() -> System.out.println("Done"), error -> error.printStackTrace()); +``` + +*Note: the difference between `fromAction` and `fromRunnable` is that the `Action` interface allows throwing a checked exception while the `java.lang.Runnable` does not.* + +## fromRunnable + +**Available in:** ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Flowable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Observable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Maybe`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Single`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Completable` + +When a consumer subscribes, the given `io.reactivex.function.Action` is invoked and the consumer completes or receives the exception the `Action` threw. + +#### fromRunnable example: + +```java +Runnable runnable = () -> System.out.println("Hello World!"); + +Completable completable = Completable.fromRunnable(runnable); + +completable.subscribe(() -> System.out.println("Done"), error -> error.printStackTrace()); +``` + +*Note: the difference between `fromAction` and `fromRunnable` is that the `Action` interface allows throwing a checked exception while the `java.lang.Runnable` does not.* + +### fromFuture + +**Available in:** ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Flowable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Observable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Maybe`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Single`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Completable` + +Given a pre-existing, already running or already completed `java.util.concurrent.Future`, wait for the `Future` to complete normally or with an exception in a blocking fashion and relay the produced value or exception to the consumers. + +#### fromFuture example: + +```java +ScheduledExecutorService executor = Executors.newSingleThreadedScheduledExecutor(); + +Future future = executor.schedule(() -> "Hello world!", 1, TimeUnit.SECONDS); + +Observable observable = Observable.fromFuture(future); + +observable.subscribe( + item -> System.out.println(item), + error -> error.printStackTrace(), + () -> System.out.println("Done")); + +executor.shutdown(); +``` + +### from{reactive type} + +Wraps or converts another reactive type to the target reactive type. + +The following combinations are available in the various reactive types with the following signature pattern: `targetType.from{sourceType}()` + +**Available in:** + +targetType \ sourceType | Publisher | Observable | Maybe | Single | Completable +----|---------------|-----------|---------|-----------|---------------- +Flowable | ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) | | | | | +Observable | ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) | | | | | +Maybe | | | | ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) | ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) +Single | ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) | ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) | | | +Completable | ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) | ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) | ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) | ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) | + +*Note: not all possible conversion is implemented via the `from{reactive type}` method families. Check out the `to{reactive type}` method families for further conversion possibilities. + +#### from{reactive type} example: + +```java +Flux reactorFlux = Flux.fromCompletionStage(CompletableFuture.completedFuture(1)); + +Observable observable = Observable.fromPublisher(reactorFlux); + +observable.subscribe( + item -> System.out.println(item), + error -> error.printStackTrace(), + () -> System.out.println("Done")); +``` + +## create + +**Available in:** ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Flowable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Observable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Maybe`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Single`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Completable` + +**ReactiveX doumentation:** [http://reactivex.io/documentation/operators/create.html](http://reactivex.io/documentation/operators/create.html) + +Construct a **safe** reactive type instance which when subscribed to by a consumer, runs an user-provided function and provides a type-specific `Emitter` for this function to generate the signal(s) the designated business logic requires. This method allows bridging the non-reactive, usually listener/callback-style world, with the reactive world. + +#### create example: + +```java +ScheduledExecutorService executor = Executors.newSingleThreadedScheduledExecutor(); + +ObservableOnSubscribe handler = emitter -> { + + Future future = executor.schedule(() -> { + emitter.onNext("Hello"); + emitter.onNext("World"); + emitter.onComplete(); + return null; + }, 1, TimeUnit.SECONDS); + + emitter.setCancellable(() -> future.cancel(false)); +}; + +Observable observable = Observable.create(handler); + +observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(), + () -> System.out.println("Done")); + +Thread.sleep(2000); +executor.shutdown(); +``` + +*Note: `Flowable.create()` must also specify the backpressure behavior to be applied when the user-provided function generates more items than the downstream consumer has requested.* + +## defer + +**Available in:** ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Flowable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Observable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Maybe`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Single`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Completable` + +**ReactiveX doumentation:** [http://reactivex.io/documentation/operators/defer.html](http://reactivex.io/documentation/operators/defer.html) + +Calls an user-provided `java.util.concurrent.Callable` when a consumer subscribes to the reactive type so that the `Callable` can generate the actual reactive instance to relay signals from towards the consumer. `defer` allows: + +- associating a per-consumer state with such generated reactive instances, +- allows executing side-effects before an actual/generated reactive instance gets subscribed to, +- turn hot sources (i.e., `Subject`s and `Processor`s) into cold sources by basically making those hot sources not exist until a consumer subscribes. + +#### defer example: + +```java +Observable observable = Observable.defer(() -> { + long time = System.currentTimeMillis(); + return Observable.just(time); +}); + +observable.subscribe(time -> System.out.println(time)); + +Thread.sleep(1000); + +observable.subscribe(time -> System.out.println(time)); +``` + +## range + +**Available in:** ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Flowable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Observable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Maybe`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Single`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Completable` + +**ReactiveX doumentation:** [http://reactivex.io/documentation/operators/range.html](http://reactivex.io/documentation/operators/range.html) + +Generates a sequence of values to each individual consumer. The `range()` method generates `Integer`s, the `rangeLong()` generates `Long`s. + +#### range example: +```java +String greeting = "Hello World!"; + +Observable indexes = Observable.range(0, greeting.length()); + +Observable characters = indexes + .map(index -> greeting.charAt(index)); + +characters.subscribe(character -> System.out.print(character), erro -> error.printStackTrace(), + () -> System.out.println()); +``` + +## interval + +**Available in:** ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Flowable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Observable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Maybe`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Single`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Completable` + +**ReactiveX doumentation:** [http://reactivex.io/documentation/operators/interval.html](http://reactivex.io/documentation/operators/interval.html) + +Periodically generates an infinite, ever increasing numbers (of type `Long`). The `intervalRange` variant generates a limited amount of such numbers. + +#### interval example: + +```java +Observable clock = Observable.interval(1, TimeUnit.SECONDS); + +clock.subscribe(time -> { + if (time % 2 == 0) { + System.out.println("Tick"); + } else { + System.out.println("Tock"); + } +}); +``` + +## timer + +**Available in:** ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Flowable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Observable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Maybe`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Single`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Completable` + +**ReactiveX doumentation:** [http://reactivex.io/documentation/operators/timer.html](http://reactivex.io/documentation/operators/timer.html) + +After the specified time, this reactive source signals a single `0L` (then completes for `Flowable` and `Observable`). + +#### timer example: + +```java +Observable eggTimer = Observable.timer(5, TimeUnit.MINUTES); + +eggTimer.blockingSubscribe(v -> System.out.println("Egg is ready!")); +``` + +## empty + +**Available in:** ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Flowable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Observable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Maybe`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_off.png) `Single`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Completable` + +**ReactiveX doumentation:** [http://reactivex.io/documentation/operators/empty-never-throw.html](http://reactivex.io/documentation/operators/empty-never-throw.html) + +This type of source signals completion immediately upon subscription. + +#### empty example: + +```java +Observable empty = Observable.empty(); + +empty.subscribe( + v -> System.out.println("This should never be printed!"), + error -> System.out.println("Or this!"), + () -> System.out.println("Done will be printed.")); +``` + +## never + +**Available in:** ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Flowable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Observable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Maybe`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Single`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Completable` + +**ReactiveX doumentation:** [http://reactivex.io/documentation/operators/empty-never-throw.html](http://reactivex.io/documentation/operators/empty-never-throw.html) + +This type of source does not signal any `onNext`, `onSuccess`, `onError` or `onComplete`. This type of reactive source is useful in testing or "disabling" certain sources in combinator operators. + +#### never example: + +```java +Observable never = Observable.never(); + +never.subscribe( + v -> System.out.println("This should never be printed!"), + error -> System.out.println("Or this!"), + () -> System.out.println("This neither!")); +``` + +## error + +**Available in:** ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Flowable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Observable`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Maybe`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Single`, ![image](https://raw.github.com/wiki/ReactiveX/RxJava/images/checkmark_on.png) `Completable` + +**ReactiveX doumentation:** [http://reactivex.io/documentation/operators/empty-never-throw.html](http://reactivex.io/documentation/operators/empty-never-throw.html) + +Signal an error, either pre-existing or generated via a `java.util.concurrent.Callable`, to the consumer. + +#### error example: + +```java +Observable error = Observable.error(new IOException()); + +error.subscribe( + v -> System.out.println("This should never be printed!"), + error -> error.printStackTrace(), + () -> System.out.println("This neither!")); +``` + +A typical use case is to conditionally map or suppress an exception in a chain utilizing `onErrorResumeNext`: + +```java +Observable observable = Observable.fromCallable(() -> { + if (Math.random() < 0.5) { + throw new IOException(); + } + throw new IllegalArgumentException(); +}); + +Observable result = observable.onErrorResumeNext(error -> { + if (error instanceof IllegalArgumentException) { + return Observable.empty(); + } + return Observable.error(error); +}); + +for (int i = 0; i < 10; i++) { + result.subscribe( + v -> System.out.println("This should never be printed!"), + error -> error.printStackTrace(), + () -> System.out.println("Done")); +} +```