Reactive programming
Reactive programming is yet another programming paradigm. It is based around the ability to easily express data flows and the automatic propagation of changes.
Let's explore this in more depth:
- Data flows (or data streams): In reactive programming, we want to think about variables as "values that change over time". For example, this could be a mouse position, user click or data coming via WebSockets. Basically, any event-based system can be considered a data stream.
- Propagation of change: A very nice example is a spreadsheet editor. If we set the value of a single cell to
A1 = A2 + A3
, this means that every change to cellsA2
andA3
will be propagated toA1
. In programmers' speech, this corresponds to the observer design pattern whereA2
andA3
are observables andA1
is an observer. We'll talk about the observer pattern again later in this chapter. - Easily express data flows: This is related mostly to libraries we use rather than to the language itself. It means that, if we want to use reactive programming effectively, we need to be able to manipulate data streams easily. This principle also suggests that reactive programming falls under the category of declarative paradigms.
As we can see, the definition is very broad.
The first part about data flows and propagation of change looks like the observer design pattern with iterables. Expressing data flows with ease could be done with functional programming. This all basically describes what we've already seen in this chapter.
The main differences to the observer pattern are how we think and manipulate with data streams. In previous examples, we always worked with arrays as inputs, which are synchronous, while data streams can be both synchronous and asynchronous. From our point of view, it doesn't matter.
Let's see what a typical implementation of the observer pattern might look like in PHP:
// observer_01.php class Observable { /** @var Observer[] */ private $observers = []; private $id; static private $total = 0; public function __construct() { $this->id = ++self::$total; } public function registerObserver(Observer $observer) { $this->observers[] = $observer; } public function notifyObservers() { foreach ($this->observers as $observer) { $observer->notify($this, func_get_args()); } } public function __toString() { return sprintf('Observable #%d', $this->id); } }
In order to be notified about any changes made by the Observable, we need another class called Observer
that subscribes to an Observable:
// observer_01.php class Observer { static private $total = 0; private $id; public function __construct(Observable $observable) { $this->id = ++self::$total; $observable->registerObserver($this); } public function notify($obsr, $args) { $format = "Observer #%d got "%s" from %s\n"; printf($format, $this->id, implode(', ', $args), $obsr); } }
Then, a typical usage might look like the following:
$observer1 = new Observer($subject); $observer2 = new Observer($subject); $subject->notifyObservers('test');
This example will print two messages to the console:
$ php observer_01.php // Observer #1 got "test" from Observable #1 // Observer #2 got "test" from Observable #1
This almost follows how we defined the reactive programming paradigm. A data stream is a sequence of events coming from an Observable, and changes are propagated to all listening observers. The last point we mentioned above - being able to easily express data flows - isn't really there. What if we wanted to filter out all events that don't match a particular condition, just like we did in the examples with array_filter()
and functional programming? This logic would have to go into each Observer
class implementation.
The principles of reactive programming are actually very common in some libraries. We'll have a look at three of them and see how these relate to what we've just learned about reactive and functional programming.
jQuery Promises
Probably every web developer has used jQuery at some point. A very handy way of avoiding so-called callback hell is using Promises when dealing with asynchronous calls. For example, calling jQuery.ajax()
returns a Promise
object that is resolved or rejected when the AJAX call has finished:
$.get('/foo/bar').done(response => { // ... }).fail(response => { // ... }).complete(response => { // ... });
A Promise
object represents a value in the future. It's non-blocking (asynchronous), but lets us handle it in a declarative approach.
Another useful use case is chaining callbacks, forming a chain, where each callback can modify the value before propagating it further:
// promises_01.js function functionReturningAPromise() { var d = $.Deferred(); setTimeout(() => d.resolve(42), 0); return d.promise(); } functionReturningAPromise() .then(value => value + 1) .then(value => 'result: ' + value) .then(value => console.log(value));
In this example, we have a single source which is the functionReturningAPromise()
call, and three callbacks where only the last one prints the value that resolved the Promise. We can see that the number 42
was modified twice when going through the chain of callbacks:
$ node promises_01.js result: 43
Note
In reactive programming, we'll use a very similar approach to Promises, but while a Promise
object is always resolved only once (it carries just one value); data streams can generate multiple or even an infinite number of values.
Gulp streaming build system
The Gulp build system has become the most popular build system in JavaScript. It's completely based on streams and manipulating them. Consider the following example:
gulp.src('src/*.js') .pipe(concat('all.min.js')) .pipe(gulp.dest('build'));
This creates a stream of files that match the predicate src/*.js
, concats all of them together and finally writes one single file to build/all.min.js
. Does this remind you of anything?
This is the same declarative and functional approach we used above, when talking about functional programming in PHP. In particular, this concat()
function could be replaced with PHP's array_reduce()
.
Streams in gulp (aka vinyl-source-stream) can be modified in any way we want. We can, for example, split a stream into two new streams:
var filter = require('gulp-filter'); var stream = gulp.src('src/*.js'); var substream1 = stream.pipe(filter(['*.min.js'])); var substream2 = stream.pipe(filter(['!/app/*']));
Or, we can merge two streams and uglify (minify and obfuscate the source code) into one stream:
var merge = require('merge2'); merge(gulp.src('src/*.js'), gulp.src('vendor/*')) .pipe(uglify()); .pipe(gulp.dest('build'));
This stream manipulation corresponds very well to the last concept we used to define the reactive programming paradigm - express data flows with ease - while it's both functional and declarative.
EventDispatcher component in PHP
Probably every PHP framework comes with some type of event-driven component to notify various different parts of an application using events.
One such component comes with the Symfony framework out-of-the-box ( https://github.com/symfony/event-dispatcher ). It's an independent component that allows subscribing and listening to events (the observer pattern).
Event listeners can be later grouped by the events they subscribe to and can also be assigned custom tags, as shown in the following code:
use Symfony\Component\EventDispatcher\EventDispatcher; $dispatcher = new EventDispatcher(); $listener = new AcmeListener(); $dispatcher->addListener('event_name', [$listener, 'action']);
This principle is very similar to Zend\EventManager used in Zend Framework. It is just another variation of the Observable - observer combination.
We'll come back to Symfony EventDispatcher
component in
Chapter 4
, Reactive vs a Typical Event-Driven approach, where we'll explore how to apply the reactive programming approach to event-based systems, which should lead to simplification and better-organized code.