ReactiveX, or Rx for short, is an API for programming with observable event streams. RxPY is a port of ReactiveX to Python. Learning Rx with Python is particularly interesting since Python removes much of the clutter that comes with statically typed languages. RxPY works with both Python 2 and Python 3 but all examples in this tutorial uses Python 3.4.
Rx is about processing streams of events. With Rx you:
It's important to understand that with Rx you describe what you want to do with events if and when they arrive. It's all a declarative composition of operators that will do some processing the events when they arrive. If nothing happens, then nothing is processed.
Thus the pattern is that you
subscribe to an
Observable using an
subscription = Observable.subscribe(observer)
NOTE: Observables are not active in themselves. They need to be subscribed to make something happen. Simply having an Observable lying around doesn't make anything happen.
In :%%bash pip install rx
Requirement already satisfied (use --upgrade to upgrade): rx in /Users/dbrattli/GitHub/RxPY
In :import rx from rx import Observable, Observer
In :class MyObserver(Observer): def on_next(self, x): print("Got: %s" % x) def on_error(self, e): print("Got error: %s" % e) def on_completed(self): print("Sequence completed") xs = Observable.from_iterable(range(10)) d = xs.subscribe(MyObserver())
Got: 0 Got: 1 Got: 2 Got: 3 Got: 4 Got: 5 Got: 6 Got: 7 Got: 8 Got: 9 Sequence completed
In :xs = Observable.from_(range(10)) d = xs.subscribe(print)
0 1 2 3 4 5 6 7 8 9
NOTE: The subscribe method takes an observer, or one to three callbacks for handing
on_completed(). This is why we can use
on_next() handler for an anonymous observer.
In :xs = Observable.from_(range(10)) d = xs.filter( lambda x: x % 2 ).subscribe(print)
1 3 5 7 9
In :xs = Observable.from_(range(10)) d = xs.map( lambda x: x * 2 ).subscribe(print)
0 2 4 6 8 10 12 14 16 18
NOTE: You can also take an index as the second parameter to the mapper function:
In :xs = Observable.from_(range(10, 20, 2)) d = xs.map( lambda x, i: "%s: %s" % (i, x * 2) ).subscribe(print)
0: 20 1: 24 2: 28 3: 32 4: 36
In :xs = Observable.range(1, 5) ys = Observable.from_("abcde") zs = xs.merge(ys).subscribe(print)
a 1 b 2 c 3 d 4 e 5
In the examples above all the events happen at the same moment in time. The events are only separated by ordering. This confuses many newcomers to Rx since the result of the
merge operation above may have several valid results such as:
a1b2c3d4e5 1a2b3c4d5e ab12cd34e5 abcde12345
The only garantie you have is that 1 will be before 2 in
xs, but 1 in
xs can be before or after
ys. It's up the the sort stability of the scheduler to decide which event should go first. For real time data streams this will not be a problem since the events will be separated by actual time. To make sure you get the results you "expect", it's always a good idea to add some time between the events when playing with Rx.
As we saw in the previous section it's nice to add some time when playing with Rx and RxPY. A great way to explore RxPY is to use the
marbles test module that enables us to play with marble diagrams. The marbles module adds two new extension methods to
Observable. The methods are
res = rx.Observable.from_marbles("1-2-3-|")
res = rx.Observable.from_marbles("1-2-3-x", rx.Scheduler.timeout)
The marble string consists of some special characters:
- = Timespan of 100 ms x = on_error() | = on_completed()
All other characters are treated as an
on_next() event at the given moment they are found on the string. If you need to represent multi character values, then you can group then with brackets such as "1-(42)-3".
Lets try it out:
In :from rx.testing import marbles xs = Observable.from_marbles("a-b-c-|") xs.to_blocking().to_marbles()
It's now easy to also add errors into the even stream by inserting
x into the marble string:
In :xs = Observable.from_marbles("1-2-3-x-5") ys = Observable.from_marbles("1-2-3-4-5") xs.merge(ys).to_blocking().to_marbles()
A simple way to create an observable stream is to use a subject. It's probably called a subject after the Subject-Observer pattern described in the Design Patterns book by the gang of four (GOF).
Anyway, a Subject is both an
Observable and an
Observer, so you can both subscribe to it and
on_next it with events. This makes it an obvious candidate if need to publish values into an observable stream for processing:
In :from rx.subjects import Subject stream = Subject() stream.on_next(41) d = stream.subscribe(lambda x: print("Got: %s" % x)) stream.on_next(42) d.dispose() stream.on_next(43)
That's all for now
In [ ]: