Introducing RxJava
To write reactive programs, we need a library or a specific programming language, because building something like that ourselves is quite a difficult task. Java is not really a reactive programming language (it provides some tools like the java.util.Observable
class, but they are quite limited). It is a statically typed, object-oriented language, and we write a lot of boilerplate code to accomplish simple things (POJOs, for example). But there are reactive libraries in Java that we can use. In this book, we will be using RxJava (developed by people in the Java open source community, guided by Netflix).
Downloading and setting up RxJava
You can download and build RxJava from Github (https://github.com/ReactiveX/RxJava). It requires zero dependencies and supports Java 8 lambdas. The documentation provided by its Javadoc and the GitHub wiki pages is well structured and some of the best out there. Here is how to check out the project and run the build:
$ git clone [email protected]:ReactiveX/RxJava.git $ cd RxJava/ $ ./gradlew build
Of course, you can also download the prebuilt JAR. For this book, we'll be using version 1.0.8.
If you use Maven, you can add RxJava as a dependency to your pom.xml
file:
<dependency> <groupId>io.reactivex</groupId> <artifactId>rxjava</artifactId> <version>1.0.8</version> </dependency>
Alternatively, for Apache Ivy, put this snippet in your Ivy file's dependencies:
<dependency org="io.reactivex" name="rxjava" rev="1.0.8" />
If you use Gradle instead, update your build.gradle
file's dependencies as follows:
dependencies { ... compile 'io.reactivex:rxjava:1.0.8' ... }
Note
The code examples and programs accompanying this book can be built and tested with Gradle. It can be downloaded from this Github repository: https://github.com/meddle0x53/learning-rxjava.
Now, let's take a peek at what RxJava is all about. We are going to begin with something well known, and gradually get into the library's secrets.
Comparing the iterator pattern and the RxJava Observable
As a Java programmer, it is highly possible that you've heard or used the Iterator
pattern. The idea is simple: an Iterator
instance is used to traverse through a container (collection/data source/generator), pulling the container's elements one by one when they are required, until it reaches the container's end. Here is a little example of how it is used in Java:
List<String> list = Arrays.asList("One", "Two", "Three", "Four", "Five"); // (1) Iterator<String> iterator = list.iterator(); // (2) while(iterator.hasNext()) { // 3 // Prints elements (4) System.out.println(iterator.next()); }
Every java.util.Collection
object is an Iterable
instance which means that it has the method iterator()
. This method creates an Iterator
instance, which has as its source the collection. Let's look at what the preceding code does:
- We create a new
List
instance containing five strings. - We create an
Iterator
instance from thisList
instance, using theiterator()
method. - The
Iterator
interface has two important methods:hasNext()
andnext()
. ThehasNext()
method is used to check whether theIterator
instance has more elements for traversing. Here, we haven't begun going through the elements, so it will returnTrue
. When we go through the five strings, it will returnFalse
and the program will proceed after thewhile
loop. - The first five times, when we call the
next()
method on theIterator
instance, it will return the elements in the order they were inserted in the collection. So the strings will be printed.
In this example, our program consumes the items from the List
instance using the Iterator
instance. It pulls the data (here, represented by strings) and the current thread blocks until the requested data is ready and received. So, for example, if the Iterator
instance was firing a request to a web server on every next()
method call, the main thread of our program would be blocked while waiting for each of the responses to arrive.
RxJava's building blocks are the observables. The Observable
class (note that this is not the java.util.Observable
class that comes with the JDK) is the mathematical dual of the Iterator
class, which basically means that they are like the two sides of the same coin. It has an underlying collection or computation that produces values that can be consumed by a consumer. But the difference is that the consumer doesn't "pull" these values from the producer like in the Iterator
pattern. It is exactly the opposite; the producer 'pushes' the values as notifications to the consumer.
Here is an example of the same program but written using an Observable
instance:
List<String> list = Arrays.asList("One", "Two", "Three", "Four", "Five"); // (1) Observable<String> observable = Observable.from(list); // (2) observable.subscribe(new Action1<String>() { // (3) @Override public void call(String element) { System.out.println(element); // Prints the element (4) } });
Here is what is happening in the code:
- We create the list of strings in the same way as in the previous example.
- Then, we create an
Observable
instance from the list, using thefrom(Iterable<? extends T> iterable)
method. This method is used to create instances ofObservable
that send all the values synchronously from anIterable
instance (the list in our case) one by one to their subscribers (consumers). We'll look at how the values are sent to the subscribers one by one in Chapter 3, Creating and Connecting Observables, Observers, and Subjects. - Here, we can subscribe to the
Observable
instance. By subscribing, we tell RxJava that we are interested in thisObservable
instance and want to receive notifications from it. We subscribe using an anonymous class implementing theAction1
interface, by defining a single method—call(T)
. This method will be called by theObservable
instance every time it has a value, ready to be pushed. Always creating newAction1
instances may seem too verbose, but Java 8 solves this verbosity. We'll learn more about that in Chapter 2, Using the Functional Constructions of Java 8. - So, every string from the source list will be pushed through to the
call()
method, and it will be printed.
Instances of the RxJava Observable
class behave somewhat like asynchronous iterators, which notify that there is a next value their subscribers/consumers by themselves. In fact, the Observable
class adds to the classic Observer
pattern (implemented in Java—see java.util.Observable
, see Design Patterns: Elements of Reusable Object-Oriented Software
by the Gang Of Four) two things available in the Iterable
type.
- The ability to signal the consumer that there is no more data available. Instead of calling the
hasNext()
method, we can attach a subscriber to listen for a 'OnCompleted
' notification. - The ability to signal the subscriber that an error has occurred. Instead of try-catching an error, we can attach an error listener to the
Observable
instance.
These listeners can be attached using the subscribe(Action1<? super T>, Action1 <Throwable>, Action0)
method. Let's expand the Observable
instance example by adding error and completed listeners:
List<String> list = Arrays.asList("One", "Two", "Three", "Four", "Five"); Observable<String> observable = Observable.from(list); observable.subscribe(new Action1<String>() { @Override public void call(String element) { System.out.println(element); } }, new Action1<Throwable>() { @Override public void call(Throwable t) { System.err.println(t); // (1) } }, new Action0() { @Override public void call() { System.out.println("We've finnished!"); // (2) } });
The new things here are:
- If there is an error while processing the elements, the
Observable
instance will send this error through thecall(Throwable)
method of this listener. This is analogous to the try-catch block in theIterator
instance example. - When everything finishes, this
call()
method will be invoked by theObservable
instance. This is analogous to using thehasNext()
method in order to see if the traversal over theIterable
instance has finished and printing "We've finished!".
Note
This example is available at GitHub and can be viewed/downloaded at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter01/ObservableVSIterator.java.
We saw how we can use the Observable
instances and that they are not so different from something familiar to us—the Iterator
instance. These Observable
instances can be used for building asynchronous streams and pushing data updates to their subscribers (they can have multiple subscribers).This is an implementation of the reactive programming paradigm. The data is being propagated to all the interested parties—the subscribers.
Coding using such streams is a more functional-like implementation of Reactive Programming. Of course, there are formal definitions and complex terms for it, but this is the simplest explanation.
Subscribing to events should be familiar; for example, clicking on a button in a GUI application fires an event which is propagated to the subscribers—handlers. But, using RxJava, we can create data streams from anything—file input, sockets, responses, variables, caches, user inputs, and so on. On top of that, consumers can be notified that the stream is closed, or that there has been an error. So, by using these streams, our applications can react to failure.
To summarize, a stream is a sequence of ongoing messages/events, ordered as they are processed in real time. It can be looked at as a value that is changing through time, and these changes can be observed by subscribers (consumers), dependent on it. So, going back to the example from Excel, we have effectively replaced the traditional variables with "reactive variables" or RxJava's Observable
instances.
Implementing the reactive sum
Now that we are familiar with the Observable
class and the idea of how to use it to code in a reactive way, we are ready to implement the reactive sum, mentioned at the beginning of this chapter.
Let's look at the requirements our program must fulfill:
- It will be an application that runs in the terminal.
- Once started, it will run until the user enters
exit
. - If the user enters
a:<number>
, the a collector will be updated to the <number>. - If the user enters
b:<number>
, the b collector will be updated to the <number>. - If the user enters anything else, it will be skipped.
- When both the a and b collectors have initial values, their sum will automatically be computed and printed on the standard output in the format a + b = <sum>. On every change in a or b, the sum will be updated and printed.
The source code contains features that we will discuss in detail in the next four chapters.
The first piece of code represents the main body of the program:
ConnectableObservable<String> input = from(System.in); // (1) Observable<Double> a = varStream("a", input); (2) Observable<Double> b = varStream("b", input); ReactiveSum sum = new ReactiveSum(a, b); (3) input.connect(); (4)
There are a lot of new things happening here:
- The first thing we must do is to create an
Observable
instance, representing the standard input stream (System.in
). So, we use thefrom(InputStream)
method (implementation will be presented in the next code snippet) to create aConnectableObservable
variable from theSystem.in
. TheConnectableObservable
variable is anObservable
instance and starts emitting events coming from its source only after itsconnect()
method is called. Read more on it in Chapter 3, Creating and Connecting Observables, Observers, and Subjects. - We create two
Observable
instances representing thea
andb
values, using thevarStream(String, Observable)
method, which we are going to examine later. The source stream for these values is the input stream. - We create a
ReactiveSum
instance, dependent on thea
andb
values. - And now, we can start listening to the input stream.
This code is responsible for building dependencies in the program and starting it off. The a
and b
values are dependent on the user input and their sum is dependent on them.
Now let's look at the implementation of the from(InputStream)
method, which creates an Observable
instance with the java.io.InputStream
source:
static ConnectableObservable<String> from(final InputStream stream) { return from(new BufferedReader(new InputStreamReader(stream)));// (1) } static ConnectableObservable<String> from(final BufferedReader reader) { return Observable.create(new OnSubscribe<String>() { // (2) @Override public void call(Subscriber<? super String> subscriber) { if (subscriber.isUnsubscribed()) { // (3) return; } try { String line; while(!subscriber.isUnsubscribed() && (line = reader.readLine()) != null) { // (4) if (line == null || line.equals("exit")) { // (5) break; } subscriber.onNext(line); // (6) } } catch (IOException e) { // (7) subscriber.onError(e); } if (!subscriber.isUnsubscribed()) // (8) subscriber.onCompleted(); } } }).publish(); // (9) }
This is one complex piece of code, so let's look at it step-by-step:
- This method implementation converts its
InputStream
parameter to theBufferedReader
object and to calls thefrom(BufferedReader)
method. We are doing that because we are going to use strings as data, and working with theReader
instance is easier. - So the actual implementation is in the second method. It returns an
Observable
instance, created using theObservable.create(OnSubscribe)
method. This method is the one we are going to use the most in this book. It is used to createObservable
instances with custom behavior. Therx.Observable.OnSubscribe
interface passed to it has one method,call(Subscriber)
. This method is used to implement the behavior of theObservable
instance because theSubscriber
instance passed to it can be used to emit messages to theObservable
instance's subscriber. A subscriber is the client of anObservable
instance, which consumes its notifications. Read more about that in Chapter 3, Creating and Connecting Observables, Observers, and Subjects. - If the subscriber has already unsubscribed from this
Observable
instance, nothing should be done. - The main logic is to listen for user input, while the subscriber is subscribed. Every line the user enters in the terminal is treated as a message. This is the main loop of the program.
- If the user enters the word
exit
and hits Enter, the main loop stops. - Otherwise, the message the user entered is passed as a notification to the subscriber of the
Observable
instance, using theonNext(T)
method. This way, we pass everything to the interested parties. It's their job to filter out and transform the raw messages. - If there is an IO error, the subscribers are notified with an
OnError
notification through theonError(Throwable)
method. - If the program reaches here (through breaking out of the main loop) and the subscriber is still subscribed to the
Observable
instance, anOnCompleted
notification is sent to the subscribers using theonCompleted()
method. - With the
publish()
method, we turn the newObservable
instance intoConnectableObservable
instance. We have to do this because, otherwise, for every subscription to thisObservable
instance, our logic will be executed from the beginning. In our case, we want to execute it only once and all the subscribers to receive the same notifications; this is achievable with the use of aConnectableObservable
instance. Read more about that in Chapter 3, Creating and Connecting Observables, Observers, and Subjects.
This illustrates a simplified way to turn Java's IO streams into Observable
instances. Of course, with this main loop, the main thread of the program will block waiting for user input. This can be prevented using the right Scheduler
instances to move the logic to another thread. We'll revisit this topic in Chapter 6, Using Concurrency and Parallelism with Schedulers.
Now, every line the user types into the terminal is propagated as a notification by the ConnectableObservable
instance created by this method. The time has come to look at how we connect our value Observable
instances, representing the collectors of the sum, to this input Observable
instance. Here is the implementation of the varStream(String, Observable)
method, which takes a name of a value and source Observable
instance and returns an Observable
instance representing this value:
public static Observable<Double> varStream(final String varName, Observable<String> input) { final Pattern pattern = Pattern.compile("\\^s*" + varName + "\\s*[:|=]\\s*(-?\\d+\\.?\\d*)$"); // (1) return input .map(new Func1<String, Matcher>() { public Matcher call(String str) { return pattern.matcher(str); // (2) } }) .filter(new Func1<Matcher, Boolean>() { public Boolean call(Matcher matcher) { return matcher.matches() && matcher.group(1) != null; // (3) } }) .map(new Func1<Matcher, Double>() { public Double call(Matcher matcher) { return Double.parseDouble(matcher.group(1)); // (4) } }); }
The map()
and filter()
methods called on the Observable
instance here are part of the fluent API provided by RxJava. They can be called on an Observable
instance, creating a new Observable
instance that depends on these methods and that transforms or filters the incoming data. Using these methods the right way, you can express complex logic in a series of steps leading to your objective. Read more about this in Chapter 4, Transforming, Filtering, and Accumulating Your Data. Let's analyze the code:
- Our variables are interested only in messages in the format
<var_name>: <value>
or<var_name> = <value>
, so we are going to use this regular expression to filter and process only these kinds of messages. Remember that our inputObservable
instance sends each line the user writes; it is our job to handle it the right way. - Using the messages we receive from the input, we create a
Matcher
instance using the preceding regular expression as a pattern. - We pass through only data that matches the regular expression. Everything else is discarded.
- Here, the value to set is extracted as a
Double
number value.
This is how the values a
and b
are represented by streams of double values, changing in time. Now we can implement their sum. We implemented it as a class that implements the Observer
interface, because I wanted to show you another way of subscribing to Observable
instances—using the Observer
interface. Here is the code:
public static final class ReactiveSum implements Observer<Double> { // (1) private double sum; public ReactiveSum(Observable<Double> a, Observable<Double> b) { this.sum = 0; Observable.combineLatest(a, b, new Func2<Double, Double, Double>() { // (5) public Double call(Double a, Double b) { return a + b; } }).subscribe(this); // (6) } public void onCompleted() { System.out.println("Exiting last sum was : " + this.sum); // (4) } public void onError(Throwable e) { System.err.println("Got an error!"); // (3) e.printStackTrace(); } public void onNext(Double sum) { this.sum = sum; System.out.println("update : a + b = " + sum); // (2) } }
This is the implementation of the actual sum, dependent on the two Observable
instances representing its collectors:
- It is an
Observer
interface. TheObserver
instance can be passed to theObservable
instance'ssubscribe(Observer)
method and defines three methods that are named after the three types of notification:onNext(T)
,onError(Throwable)
, andonCompleted
. Read more about this interface in Chapter 3, Creating and Connecting Observables, Observers, and Subjects. - In our
onNext(Double)
method implementation, we set the sum to the incoming value and print an update to the standard output. - If we get an error, we just print it.
- When everything is done, we greet the user with the final sum.
- We implement the sum with the
combineLatest(Observable, Observable, Func2)
method. This method creates a newObservable
instance. The newObservable
instance is updated when any of the twoObservable
instances, passed to combineLatest receives an update. The value emitted through the newObservable
instance is computed by the third parameter—a function that has access to the latest values of the two source sequences. In our case, we sum up the values. There will be no notification until both of theObservable
instances passed to the method emit at least one value. So, we will have the sum only when botha
andb
have notifications. Read more about this method and other combiners in Chapter 5, Combinators, Conditionals, and Error Handling. - We subscribe our
Observer
instance to the combinedObservable
instance.
Here is sample of what the output of this example would look like:
Reacitve Sum. Type 'a: <number>' and 'b: <number>' to try it. a:4 b:5 update : a + b = 9.0 a:6 update : a + b = 11.0
So this is it! We have implemented our reactive sum using streams of data.
Note
The source code of this example can be downloaded and tried out from here: https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter01/ReactiveSumV1.java.