Getting Started with RxPY

ReactiveX, or Rx for short, is an API for programming with observable event streams. RxPY is a port of ReactiveX to Python. Learning Rx with Python is particularly interesting since Python removes much of the clutter that comes with statically typed languages. RxPY works with both Python 2 and Python 3 but all examples in this tutorial uses Python 3.4.

Rx is about processing streams of events. With Rx you:

  • Tell what you want to process (Observable)
  • How you want to process it (A composition of operators)
  • What you want to do with the result (Observer)

It's important to understand that with Rx you describe what you want to do with events if and when they arrive. It's all a declarative composition of operators that will do some processing the events when they arrive. If nothing happens, then nothing is processed.

Thus the pattern is that you subscribe to an Observable using an Observer:

subscription = Observable.subscribe(observer)

NOTE: Observables are not active in themselves. They need to be subscribed to make something happen. Simply having an Observable lying around doesn't make anything happen.

Install

Use pip to install RxPY:


In [41]:
%%bash
pip install rx


Requirement already satisfied (use --upgrade to upgrade): rx in /Users/dbrattli/GitHub/RxPY

Importing the Rx module


In [4]:
import rx
from rx import Observable, Observer

Generating a sequence

There are many ways to generate a sequence of events. The easiest way to get started is to use the from_iterable() operator that is also called just from_. Other operators you may use to generate a sequence such as just, generate, create and range.


In [5]:
class MyObserver(Observer):
    def on_next(self, x):
        print("Got: %s" % x)
        
    def on_error(self, e):
        print("Got error: %s" % e)
        
    def on_completed(self):
        print("Sequence completed")

xs = Observable.from_iterable(range(10))
d = xs.subscribe(MyObserver())


Got: 0
Got: 1
Got: 2
Got: 3
Got: 4
Got: 5
Got: 6
Got: 7
Got: 8
Got: 9
Sequence completed

In [6]:
xs = Observable.from_(range(10))
d = xs.subscribe(print)


0
1
2
3
4
5
6
7
8
9

NOTE: The subscribe method takes an observer, or one to three callbacks for handing on_next(), on_error(), and on_completed(). This is why we can use print directly as the observer in the example above, since it becomes the on_next() handler for an anonymous observer.

Filtering a sequence


In [4]:
xs = Observable.from_(range(10))
d = xs.filter(
        lambda x: x % 2
    ).subscribe(print)


1
3
5
7
9

Transforming a sequence


In [7]:
xs = Observable.from_(range(10))
d = xs.map(
        lambda x: x * 2
    ).subscribe(print)


0
2
4
6
8
10
12
14
16
18

NOTE: You can also take an index as the second parameter to the mapper function:


In [8]:
xs = Observable.from_(range(10, 20, 2))
d = xs.map(
        lambda x, i: "%s: %s" % (i, x * 2)
    ).subscribe(print)


0: 20
1: 24
2: 28
3: 32
4: 36

Merge

Merging two observable sequences into a single observable sequence using the merge operator:


In [6]:
xs = Observable.range(1, 5)
ys = Observable.from_("abcde")
zs = xs.merge(ys).subscribe(print)


a
1
b
2
c
3
d
4
e
5

The Spacetime of Rx

In the examples above all the events happen at the same moment in time. The events are only separated by ordering. This confuses many newcomers to Rx since the result of the merge operation above may have several valid results such as:

a1b2c3d4e5
1a2b3c4d5e
ab12cd34e5
abcde12345

The only garantie you have is that 1 will be before 2 in xs, but 1 in xs can be before or after a in ys. It's up the the sort stability of the scheduler to decide which event should go first. For real time data streams this will not be a problem since the events will be separated by actual time. To make sure you get the results you "expect", it's always a good idea to add some time between the events when playing with Rx.

Marbles and Marble Diagrams

As we saw in the previous section it's nice to add some time when playing with Rx and RxPY. A great way to explore RxPY is to use the marbles test module that enables us to play with marble diagrams. The marbles module adds two new extension methods to Observable. The methods are from_marbles() and to_marbles().

Examples:

  1. res = rx.Observable.from_marbles("1-2-3-|")
  2. res = rx.Observable.from_marbles("1-2-3-x", rx.Scheduler.timeout)

The marble string consists of some special characters:

    - = Timespan of 100 ms
    x = on_error()
    | = on_completed()

All other characters are treated as an on_next() event at the given moment they are found on the string. If you need to represent multi character values, then you can group then with brackets such as "1-(42)-3".

Lets try it out:


In [8]:
from rx.testing import marbles

xs = Observable.from_marbles("a-b-c-|")
xs.to_blocking().to_marbles()


Out[8]:
'a-b-c-|'

It's now easy to also add errors into the even stream by inserting x into the marble string:


In [9]:
xs = Observable.from_marbles("1-2-3-x-5")
ys = Observable.from_marbles("1-2-3-4-5")
xs.merge(ys).to_blocking().to_marbles()


Out[9]:
'11-22-33-4x'

Subjects and Streams

A simple way to create an observable stream is to use a subject. It's probably called a subject after the Subject-Observer pattern described in the Design Patterns book by the gang of four (GOF).

Anyway, a Subject is both an Observable and an Observer, so you can both subscribe to it and on_next it with events. This makes it an obvious candidate if need to publish values into an observable stream for processing:


In [9]:
from rx.subjects import Subject

stream = Subject()
stream.on_next(41)

d = stream.subscribe(lambda x: print("Got: %s" % x))

stream.on_next(42)

d.dispose()
stream.on_next(43)


Got: 42

That's all for now


In [ ]: