Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Free Learning
Arrow right icon
Arrow up icon
GO TO TOP
C++ Reactive Programming

You're reading from   C++ Reactive Programming Design concurrent and asynchronous applications using the RxCpp library and Modern C++17

Arrow left icon
Product type Paperback
Published in Jun 2018
Publisher Packt
ISBN-13 9781788629775
Length 348 pages
Edition 1st Edition
Languages
Tools
Concepts
Arrow right icon
Authors (2):
Arrow left icon
Peter Abraham Peter Abraham
Author Profile Icon Peter Abraham
Peter Abraham
Praseed Pai Praseed Pai
Author Profile Icon Praseed Pai
Praseed Pai
Arrow right icon
View More author details
Toc

Table of Contents (15) Chapters Close

Preface 1. Reactive Programming Model – Overview and History FREE CHAPTER 2. A Tour of Modern C++ and its Key Idioms 3. Language-Level Concurrency and Parallelism in C++ 4. Asynchronous and Lock-Free Programming in C++ 5. Introduction to Observables 6. Introduction to Event Stream Programming Using C++ 7. Introduction to Data Flow Computation and the RxCpp Library 8. RxCpp – the Key Elements 9. Reactive GUI Programming Using Qt/C++ 10. Creating Custom Operators in RxCpp 11. Design Patterns and Idioms for C++ Rx Programming 12. Reactive Microservices Using C++ 13. Advanced Streams and Handling Errors 14. Other Books You May Enjoy

Pull-versus push-based reactive programming

Reactive programs can be classified as push-based and pull-based. The pull-based system waits for a demand to push the data streams to the requestor (or subscriber in our case). This is the classic case where the data source is actively polled for more information. This employs the iterator pattern, and IEnumerable <T>/IEnumerator <T> interfaces are specifically designed for such scenarios that are synchronous in nature (the application can block while pulling data). On the other hand, a push-based system aggregates events and pushes through a signal network to achieve the computation. In this case, unlike the pull-based system, data and related updates are handed to the subscriber from the source (Observable sequences in this case). This asynchronous nature is achieved by not blocking the subscriber, but rather making it react to the changes. As you can see, employing this push pattern is more beneficial in rich UI environments where you wouldn't want to block the main UI thread while waiting for some events. This becomes ideal, thus making reactive programs responsive.

The IEnumerable/IObservable duality

If you take a closer look, there is only a subtle difference between these two patterns. IEnumerable<T> can be considered the pull-based equivalent of the push-based IObservable<T>. In fact, they are duals. When two entities exchange information, one entity's pull corresponds to another entity pushing the information. This duality is illustrated in the following diagram:

Let's understand this duality by looking at this sample code, a number sequence generator:

We have striven to use classic C++ constructs to write programs for this particular chapter as there are chapters on Modern C++ language features, language level concurrency, lock-free programming, and related topics for implementing Reactive constructs in Modern C++.
#include <iostream>
#include <vector>
#include <iterator>
#include <memory>
#include "../Common2.h"
using namespace std;

class ConcreteEnumberable : public IEnumerable<int>
{
int *numberlist,_count;
public:
ConcreteEnumberable(int numbers[], int count):
numberlist(numbers),_count(count){}
~ConcreteEnumberable() {}

class Enumerator : public IEnumerator<int>
{
int *inumbers, icount, index;
public:
Enumerator(int *numbers,
int count):inumbers(numbers),icount(count),index(0) {}
bool HasMore() { return index < icount; }
//---------- ideally speaking, the next function should throw
//---------- an exception...instead it just returns -1 when the
//---------- bound has reached
int next() { return (index < icount) ?
inumbers[index++] : -1; }
~Enumerator() {}
};
IEnumerator<int> *GetEnumerator()
{ return new Enumerator(numberlist, _count); }
};

The preceding class takes an array of integers as a parameter and we can enumerate over the elements as we have implemented the IEnumerable<T> interface. The Enumeration logic is implemented by the nested class, which implements the IEnumerator<T> interface:

int main()
{
int x[] = { 1,2,3,4,5 };
//-------- Has used Raw pointers on purpose here as we have
//------- not introduced unique_ptr,shared_ptr,weak_ptr yet
//-------- using auto_ptr will be confusting...otherwise
//-------- need to use boost library here... ( an overkill)
ConcreteEnumberable *t = new ConcreteEnumberable(x, 5);
IEnumerator<int> * numbers = t->GetEnumerator();
while (numbers->HasMore())
cout << numbers->next() << endl;
delete numbers;delete t;
return 0;
}

The main program instantiates an implementation of the ConcreteEnuerable class and walks through each element.

We will write an even number sequence generator to demonstrate how these data types work together in converting a pull-based program to a push program. The robustness aspect is given low priority to keep the listing terse:

#include "stdafx.h"
#include <iostream>
#include <vector>
#include <iterator>
#include <memory>
#include "../Common2.h"
using namespace std;

class EvenNumberObservable : IObservable<int>{
int *_numbers,_count;
public:
EvenNumberObservable(int numbers[],
int count):_numbers(numbers),_count(count){}
bool Subscribe(IObserver<int>& observer){
for (int i = 0; i < _count; ++i)
if (_numbers[i] % 2 == 0)
observer.OnNext(_numbers[i]);
observer.OnCompleted();
return true;
}
};

The preceding program takes an array of integers, filters out of the odd numbers, and notifies Observer<T> if an even integer is encountered. In this particular case, the data source is pushing data to observer. The implementation of Observer<T> is given as follows:

class SimpleObserver : public IObserver<int>{
public:
void OnNext(int value) { cout << value << endl; }
void OnCompleted() { cout << _T("hello completed") << endl; }
void OnError( CustomException * ex) {}
};

The SimpleObserver class implements the IObserver<T> interface and it has the capability to receive notifications and react to them:

int main()
{
int x[] = { 1,2,3,4,5 };
EvenNumberObservable *t = new EvenNumberObservable(x, 5);
IObserver<int>> *xy = new SimpleObserver();
t->Subscribe(*xy);
delete xy; delete t;
return 0;
}

From the preceding example, you see how one can naturally subscribe for even numbers from an Observable sequence of natural numbers. The system will automatically push (publish) the values to the observer (subscriber) when an even number is detected. The code gives explicit implementations for key interfaces so that one can understand, or speculate what really happens under the hood.

You have been reading a chapter from
C++ Reactive Programming
Published in: Jun 2018
Publisher: Packt
ISBN-13: 9781788629775
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Banner background image