
  • An API for asynchronous programming with observable streams

  • ReactiveX, or Rx for short, is an API for programming with observable event streams.

  • The Observer pattern done right.

  • ReactiveX is a combination of the best ideas from the

    • Observer pattern,
    • The Iterator pattern, and
    • Functional programming


from ipywidgets import interact, interactive, fixed
import ipywidgets as widgets

import rx

<module 'rx' from 'E:\\Miniconda3\\lib\\site-packages\\rx\\__init__.py'>

from rx import Observable, Observer

Rx is about processing streams of events. With Rx you:

  • Tell what you want to process (Observable)
  • How you want to process it (A composition of operators)
  • What you want to do with the result (Observer)

It's important to understand that with Rx you describe what you want to do with events if and when they arrive.

It's all a

  • Declarative composition of operators

  • Will do some processing on the events when they arrive.

  • If nothing happens, then nothing is processed.

Thus the pattern is that you subscribe to an Observable using an Observer:

subscription = Observable.subscribe(observer)

NOTE: Observables are not active in themselves. They need to be subscribed to make something happen. Simply having an Observable lying around doesn't make anything happen.

Generating a Sequence

class MyObserver(Observer):
    def on_next(self, x):
        print("Got: %s" % x)
    def on_error(self, e):
        print("Got ERROR: %s" % x)
    def on_completed(self):
        print("Sequence completed")

subject = Observable.from_iterable(range(10))

<rx.core.anonymousobservable.AnonymousObservable at 0x1bfebbd080>

observer = MyObserver()

<__main__.MyObserver at 0x1bfebbd160>

Got: 0
Got: 1
Got: 2
Got: 3
Got: 4
Got: 5
Got: 6
Got: 7
Got: 8
Got: 9
Sequence completed
  • The subscribe method takes an observer, or one to three callbacks for handing on_next(), on_error(), and on_completed().
  • This is why we can use print directly as the observer in the example above, since it becomes the on_next() handler for an anonymous observer

Interactive Rx - Generating

def interactiveObservable(count):
    print("-" * 30)
    sequence = range(count)
    subject = Observable.from_iterable(sequence)
    observer = MyObserver()

countSlider = widgets.IntSlider(min=2, max=10, step=1, value=3)

interact(interactiveObservable, count=countSlider)

Got: 0
Got: 1
Got: 2
Sequence completed
<function __main__.interactiveObservable>

from functools import partial

def interactiveSequenceObservable(observer, count):
    print("-" * 30)

    sequence = range(count)
    subject = Observable.from_iterable(sequence)


Python Library: functools

countSlider = widgets.IntSlider(min=2, max=10, step=1, value=3)

callback = partial(interactiveSequenceObservable, MyObserver())
callback.__name__ = interactiveSequenceObservable.__name__

interact(callback, count=countSlider)

Got: 0
Got: 1
Got: 2
Sequence completed
functools.partial(<function interactiveSequenceObservable at 0x0000001BFEBB6F28>, <__main__.MyObserver object at 0x0000001BFEBB0B38>)

countSlider = widgets.IntSlider(min=2, max=10, step=1, value=3)

callback = partial(interactiveSequenceObservable, print)
callback.__name__ = interactiveSequenceObservable.__name__

interact(callback, count=countSlider)

functools.partial(<function interactiveSequenceObservable at 0x0000001BFEBB6F28>, <built-in function print>)

Filtering a Sequence

subject = Observable.from_iterable(range(10))
disposable = subject.filter(lambda x : x % 2).subscribe(print)

Interactive Rx - Filtering

def interactiveFilter(observer, count, divisor):
    sequence = range(1, count+1)
    subject = Observable.from_iterable(sequence)
    filtered = subject.filter(lambda x: x % divisor)

In [21]:
callback = partial(interactiveFilter, print)
callback.__name__ = interactiveFilter.__name__

countSlider = widgets.IntSlider(min=2, max=50, value=10)
divisorSlider = widgets.IntSlider(min=2, max=10, value=2)

interact(callback, count=countSlider, divisor=divisorSlider)

functools.partial(<function interactiveFilter at 0x0000001BFEBB6840>, <built-in function print>)

Transforming a Sequence

subject = Observable.from_iterable(range(10))
disposable = subject.map(lambda x : x * 2).subscribe(print)

Interactive Rx - Transformations

def interactiveTransformer(observer, count, code, debug=False):
    sequence = range(1, count+1)
    if debug:
        print(sequence, tuple(sequence))
    subject = Observable.from_iterable(sequence)
    callback = eval(code)
    filtered = subject.map(callback)

In [24]:
callback.__name__ = interactiveTransformer.__name__

countSlider = widgets.IntSlider(min=2, max=50, value=10)
codeText = widgets.Text(value="lambda x: x % 2")
debugCheckBox = widgets.Checkbox(value=False)

interact(callback, count=countSlider, code=codeText, debug=debugCheckBox)

functools.partial(<function interactiveTransformer at 0x0000001BFEBE7950>, <built-in function print>)

codeText = widgets.Text(value="lambda x: x % 2")

interact(callback, count=countSlider, code=codeText, debug=debugCheckBox)

functools.partial(<function interactiveTransformer at 0x0000001BFEBE7950>, <built-in function print>)


xs = Observable.from_iterable(range(1, 6))
ys = Observable.from_iterable('abcde')
xsMys = xs.merge(ys)

Space-Time of rx

Marbles and Marble Diagrams

- = Timespan of 100 ms

x = on_error()

| = on_completed()

from rx.testing import marbles

xs = Observable.from_marbles('a-b-c-|')



xs = Observable.from_marbles('a-b-c-x-e')
ys = Observable.from_marbles('1-2-3-4-5')


xs = Observable.from_marbles('1-2-3-x-4')
ys = Observable.from_marbles('1-2-3-4-5')



Note the -x and -4x. The xs and ys stream items may come in any order

Interactive Rx - Marbles

def interactiveMarbles(stream1, stream2):
    xs = Observable.from_marbles(stream1)
    ys = Observable.from_marbles(stream2)

stream1Marble = widgets.Text(value="1-2-3-4-5")
stream2Marble = widgets.Text(value="a-b-c-d-e")

interact(interactiveMarbles, stream1=stream1Marble, stream2=stream2Marble)


result = eval('lambda x: x * 2')
tuple(map(result, (1, 2, 3, 4)))

(2, 4, 6, 8)