(This is an excerpt from Modern Java: Second Edition)
RxJava is the open-source library for reactive programming that is part of the ReactiveX project. ReactiveX includes implementations in several different languages including rxjs, RxRuby, RxSwift, RxPHP, RxGroovy and many more.
RxJava 2 was rebuilt to be compatible with the Reactive Streams specification and is preferable to RxJava 1.x since it is scheduled for end-of-life. There were many changes from version 1 to 2 that could be confusing.
To avoid confusion we will focus on RxJava 2.
The basic entry class in RxJava is `io.reactivex.Flowable`.
It implements the Reactive-Streams Pattern and offers factory methods,
intermediate operators and the ability to consume reactive dataflows.
The following example demonstrates using RxJava to do a simple calculation on a range of numbers:
public static List doSquares() {
List squares = new ArrayList();
Flowable.range(1, 64) //1
.observeOn(Schedulers.computation()) //2
.map(v -> v * v) //3
.blockingSubscribe(squares::add); //4
1. Create a range from 1 to 64.
2. Call the method `observeOn` to determine which `Scheduler` to use. This determines on which Thread the flow will run.
3. The `map` method transforms each value. In this case we calculate the square.
4. Finally, we initiate the flow by calling a "subscribe" method. In this case, `blockingSubscribe` blocks until the entire flow has completed. This means that the `squares` list will be populated before the return statement. Otherwise the flow would run on a different thread and the values in the squares list would be unpredictable at any given time.
If you tie a Flowable to one Scheduler as in the previous example, it would run in succession, not in parallel.To run each calculation in parallel, you could use `flatMap` to break out each calculation into a separate Flowable as follows:
public static List doParallelSquares() {
List squares = new ArrayList();
Flowable.range(1, 64)
.flatMap(v -> //1
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.doOnError(ex -> ex.printStackTrace()) //2
.doOnComplete(() -> System.out.println("Completed")) //3
.blockingSubscribe(squares::add);
1. Call `flatMap` with a lambda expression that takes in a value and returns another Flowable.
2. Call `doOnError` to handle errors that occur.
3. Call `doOnComplete` to execute something after a Flowable has completed. This is only possible for flowables that have clear endings, such as Ranges.
For some heavy computations, you may want to run them in the background, while rendering the result in a separate thread so as not to block the UI or rendering thread. For this case, you can use the `subscribeOn` method with one Scheduler and the `observeOn` method with a different Scheduler.
public static void runComputation() throws Exception {
StringBuffer sb = new StringBuffer();
Flowable<String> source = Flowable.fromCallable(() -> { //1
Thread.sleep(1000); // imitate expensive computation
return "Done";
});
source.doOnComplete(() -> System.out.println("Completed runComputation"));
Flowable<String> background = source.subscribeOn(Schedulers.io()); //2
Flowable<String> foreground = background.observeOn(Schedulers.single());//3
foreground.subscribe(System.out::println, Throwable::printStackTrace);//4
}
1. Create a new Flowable from a Callable (functional interface (SAM) which simply returns a value).
2. Run the Flowable using the "IO" Scheduler.
3. Observe the results of the Flowable using a single threaded Scheduler.
4. Finally, subscribe to the resulting `foreground` Flowable to initiate the flow and print the results to standard out.
For non-trivial problems, you might need to create your own `Publisher`.
For the following example, imagine you want to write to a file or read from a file using a custom Publisher in RxJava.
First, we write a range of numbers to a file using the following method:
public static void writeFile(File file) {
try (PrintWriter pw = new PrintWriter(file)) {
Flowable.range(1, 100)
.observeOn(Schedulers.newThread())
.blockingSubscribe(pw::println);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
Here we use a try-with-resources block and `blockingSubscribe` to write the range to the file.
Second, we want to read from a file. In this example, the contents of a file
are printed to standard out using the "IO" Scheduler:
public static void readFile(File file) {
try (final BufferedReader br = new BufferedReader(new FileReader(file))) {
Flowable<String> flow = Flowable.fromPublisher(new FilePublisher(br));
flow.observeOn(Schedulers.io())
.blockingSubscribe(System.out::println);
} catch (IOException e) {
e.printStackTrace();
}
}
A Publisher implements the `subscribe` method that takes a `Subscriber`.
The `Subscriber` interface has several methods on it, the first of which to call is `onSubscribe(Subscription)`. To implement back pressure in reactive streams, the `Subscription` interface was created which has only two methods, `request(n)` for requesting the next `n` elements, and `cancel` for cancelling the subscription.
static class FilePublisher implements Publisher<String> {
BufferedReader reader;
public FilePublisher(BufferedReader reader) { this.reader = reader; }
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscriber.onSubscribe(
new FilePublisherSubscription(this, subscriber));
}
public String readLine() throws IOException {
return reader.readLine();
}
}
static class FilePublisherSubscription implements Subscription {
FilePublisher publisher;
Subscriber<? super String> subscriber;
public FilePublisherSubscription( FilePublisher publisher,
Subscriber<? super String> subscriber) {
this.publisher = publisher;
this.subscriber = subscriber;
}
@Override
public void request(long n) {
try {
String line;
for (int i = 0; i < n && publisher != null
&& (line = publisher.readLine()) != null; i++) {
if (subscriber != null) subscriber.onNext(line);
}
} catch (IOException ex) {
subscriber.onError(ex);
}
subscriber.onComplete();
}
@Override
public void cancel() {
publisher = null;
}
}
This example shows how you might implement a Publisher for reading files including back-pressure support.
A similar approach could be used for any Publisher/Subscription implementation.
(This is an excerpt from Modern Java: Second Edition as it currently stands. It is available for purchase on Leanpub and is in development but should be finished within the next few months.)