ReactiveX, or Rx for short, is an API for programming with observable event streams. RxPY is a port of ReactiveX to Python. Learning Rx with Python is particularly interesting since Python removes much of the clutter that comes with statically typed languages. RxPY works with both Python 2 and Python 3 but all examples in this tutorial uses Python 3.4.
Rx is about processing streams of events. With Rx you:
It's important to understand that with Rx you describe what you want to do with events if and when they arrive. It's all a declarative composition of operators that will do some processing the events when they arrive. If nothing happens, then nothing is processed.
Thus the pattern is that you subscribe
to an Observable
using an Observer
:
subscription = Observable.subscribe(observer)
NOTE: Observables are not active in themselves. They need to be subscribed to make something happen. Simply having an Observable lying around doesn't make anything happen.
In [41]:
%%bash
pip install rx
In [4]:
import rx
from rx import Observable, Observer
In [5]:
class MyObserver(Observer):
def on_next(self, x):
print("Got: %s" % x)
def on_error(self, e):
print("Got error: %s" % e)
def on_completed(self):
print("Sequence completed")
xs = Observable.from_iterable(range(10))
d = xs.subscribe(MyObserver())
In [6]:
xs = Observable.from_(range(10))
d = xs.subscribe(print)
NOTE: The subscribe method takes an observer, or one to three callbacks for handing on_next()
, on_error()
, and on_completed()
. This is why we can use print
directly as the observer in the example above, since it becomes the on_next()
handler for an anonymous observer.
In [4]:
xs = Observable.from_(range(10))
d = xs.filter(
lambda x: x % 2
).subscribe(print)
In [7]:
xs = Observable.from_(range(10))
d = xs.map(
lambda x: x * 2
).subscribe(print)
NOTE: You can also take an index as the second parameter to the mapper function:
In [8]:
xs = Observable.from_(range(10, 20, 2))
d = xs.map(
lambda x, i: "%s: %s" % (i, x * 2)
).subscribe(print)
In [6]:
xs = Observable.range(1, 5)
ys = Observable.from_("abcde")
zs = xs.merge(ys).subscribe(print)
In the examples above all the events happen at the same moment in time. The events are only separated by ordering. This confuses many newcomers to Rx since the result of the merge
operation above may have several valid results such as:
a1b2c3d4e5
1a2b3c4d5e
ab12cd34e5
abcde12345
The only guarantee you have is that 1 will be before 2 in xs
, but 1 in xs
can be before or after a
in ys
. It's up the the sort stability of the scheduler to decide which event should go first. For real time data streams this will not be a problem since the events will be separated by actual time. To make sure you get the results you "expect", it's always a good idea to add some time between the events when playing with Rx.
As we saw in the previous section it's nice to add some time when playing with Rx and RxPY. A great way to explore RxPY is to use the marbles
test module that enables us to play with marble diagrams. The marbles module adds two new extension methods to Observable
. The methods are from_marbles()
and to_marbles()
.
Examples:
res = rx.Observable.from_marbles("1-2-3-|")
res = rx.Observable.from_marbles("1-2-3-x", rx.Scheduler.timeout)
The marble string consists of some special characters:
- = Timespan of 100 ms
x = on_error()
| = on_completed()
All other characters are treated as an on_next()
event at the given moment they are found on the string. If you need to represent multi character values, then you can group then with brackets such as "1-(42)-3".
Lets try it out:
In [8]:
from rx.testing import marbles
xs = Observable.from_marbles("a-b-c-|")
xs.to_blocking().to_marbles()
Out[8]:
It's now easy to also add errors into the even stream by inserting x
into the marble string:
In [9]:
xs = Observable.from_marbles("1-2-3-x-5")
ys = Observable.from_marbles("1-2-3-4-5")
xs.merge(ys).to_blocking().to_marbles()
Out[9]:
A simple way to create an observable stream is to use a subject. It's probably called a subject after the Subject-Observer pattern described in the Design Patterns book by the gang of four (GOF).
Anyway, a Subject is both an Observable
and an Observer
, so you can both subscribe to it and on_next
it with events. This makes it an obvious candidate if need to publish values into an observable stream for processing:
In [9]:
from rx.subject import Subject
stream = Subject()
stream.on_next(41)
d = stream.subscribe(lambda x: print("Got: %s" % x))
stream.on_next(42)
d.dispose()
stream.on_next(43)
That's all for now
In [ ]: