In [1]:
%run startup.py

In [2]:
%%javascript
$.getScript('./assets/js/ipython_notebook_toc.js')


A Decision Tree of Observable Operators

Part 1: NEW Observables.

source: http://reactivex.io/documentation/operators.html#tree.
(transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, axiros)

This tree can help you find the ReactiveX Observable operator you’re looking for.

Table of Contents

Usage

There are no configured behind the scenes imports or code except startup.py, which defines output helper functions, mainly:

  • rst, reset_start_time: resets a global timer, in order to have use cases starting from 0.
  • subs(observable): subscribes to an observable, printing notifications with time, thread, value

All other code is explicitly given in the notebook.
Since all initialisiation of tools is in the first cell, you always have to run the first cell after ipython kernel restarts.
All other cells are autonmous.

In the use case functions, in contrast to the official examples we simply use rand quite often (mapped to randint(0, 100)), to demonstrate when/how often observable sequences are generated and when their result is buffered for various subscribers.
When in doubt then run the cell again, you might have been "lucky" and got the same random.

RxJS

The (bold printed) operator functions are linked to the official documentation and created roughly analogous to the RxJS examples. The rest of the TOC lines links to anchors within the notebooks.

Output

When the output is not in marble format we display it like so:

new subscription on stream 276507289 

   3.4  M [next]    1.4: {'answer': 42}
   3.5 T1 [cmpl]    1.6: fin

where the lines are syncronously printed as they happen. "M" and "T1" would be thread names ("M" is main thread).
For each use case in reset_start_time() (alias rst), a global timer is set to 0 and we show the offset to it, in milliseconds & with one decimal value and also the offset to the start of stream subscription. In the example 3.4, 3.5 are millis since global counter reset, while 1.4, 1.6 are offsets to start of subscription.

I want to create a NEW Observable...

... that emits a particular item: just


In [3]:
reset_start_time(O.just)
stream = O.just({'answer': rand()})
disposable = subs(stream)
sleep(0.5)
disposable = subs(stream) # same answer
# all stream ops work, its a real stream:
disposable = subs(stream.map(lambda x: x.get('answer', 0) * 2))



========== return_value ==========

module rx.linq.observable.returnvalue
@extensionclassmethod(Observable, alias="just")
def return_value(cls, value, scheduler=None):
    Returns an observable sequence that contains a single element,
    using the specified scheduler to send out observer messages.
    There is an alias called 'just'.

    example
    res = rx.Observable.return(42)
    res = rx.Observable.return(42, rx.Scheduler.timeout)

    Keyword arguments:
    value -- Single element in the resulting observable sequence.
    scheduler -- [Optional] Scheduler to send the single element on. If
        not specified, defaults to Scheduler.immediate.

    Returns an observable sequence containing the single specified
    element.
--------------------------------------------------------------------------------

   1.8     M New subscription on stream 273460685
   2.8     M [next]    0.9: {'answer': 66}
   3.3     M [cmpl]    1.5: fin

 504.5     M New subscription on stream 273460685
 505.0     M [next]    0.3: {'answer': 66}
 505.1     M [cmpl]    0.4: fin

 505.5     M New subscription on stream 272024237
 506.3     M [next]    0.7: 132
 506.8     M [cmpl]    1.1: fin

..that was returned from a function called at subscribe-time: start


In [4]:
print('There is a little API difference to RxJS, see Remarks:\n')
rst(O.start)

def f():
    log('function called')
    return rand()

stream = O.start(func=f)
d = subs(stream)
d = subs(stream)

header("Exceptions are handled correctly (an observable should never except):")

def breaking_f():    
    return 1 / 0

stream = O.start(func=breaking_f)
d = subs(stream)
d = subs(stream)



# startasync: only in python3 and possibly here(?) http://www.tornadoweb.org/en/stable/concurrent.html#tornado.concurrent.Future
#stream = O.start_async(f)
#d = subs(stream)


There is a little API difference to RxJS, see Remarks:



========== start ==========

module rx.linq.observable.start
@extensionclassmethod(Observable)
def start(cls, func, scheduler=None):
    Invokes the specified function asynchronously on the specified
    scheduler, surfacing the result through an observable sequence.

    Example:
    res = rx.Observable.start(lambda: pprint('hello'))
    res = rx.Observable.start(lambda: pprint('hello'), rx.Scheduler.timeout)

    Keyword arguments:
    func -- {Function} Function to run asynchronously.
    scheduler -- {Scheduler} [Optional] Scheduler to run the function on. If
        not specified, defaults to Scheduler.timeout.

    Returns {Observable} An observable sequence exposing the function's
    result value, or an exception.

    Remarks:
    The function is called immediately, not during the subscription of the
    resulting sequence. Multiple subscriptions to the resulting sequence can
    observe the function's result.
--------------------------------------------------------------------------------

   2.7    T4 function called   3.2     M New subscription on stream 274466149

   3.7     M [next]    0.4: 43
   3.8     M [cmpl]    0.5: fin

   4.7     M New subscription on stream 274466149
   5.1     M [next]    0.2: 43
   5.3     M [cmpl]    0.4: fin


========== Exceptions are handled correctly (an observable should never except): ==========


   6.9     M New subscription on stream 274466197
   7.5     M [err ]    0.5: integer division or modulo by zero

   8.4     M New subscription on stream 274466197
   8.9     M [err ]    0.3: integer division or modulo by zero

..that was returned from an Action, Callable, Runnable, or something of that sort, called at subscribe-time: from


In [5]:
rst(O.from_iterable)
def f():
    log('function called')
    return rand()
# aliases: O.from_, O.from_list
# 1.: From a tuple:
stream = O.from_iterable((1,2,rand()))
d = subs(stream)
# d = subs(stream) # same result

# 2. from a generator
gen = (rand() for j in range(3))
stream = O.from_iterable(gen)
d = subs(stream)



========== from_iterable ==========

module rx.linq.observable.fromiterable
@extensionclassmethod(Observable, alias=["from_", "from_list"])
def from_iterable(cls, iterable, scheduler=None):
    Converts an array to an observable sequence, using an optional
    scheduler to enumerate the array.

    1 - res = rx.Observable.from_iterable([1,2,3])
    2 - res = rx.Observable.from_iterable([1,2,3], rx.Scheduler.timeout)

    Keyword arguments:
    :param Observable cls: Observable class
    :param Scheduler scheduler: [Optional] Scheduler to run the
        enumeration of the input sequence on.

    :returns: The observable sequence whose elements are pulled from the
        given iterable sequence.
    :rtype: Observable
--------------------------------------------------------------------------------

   3.3     M New subscription on stream 274466081
   3.8     M [next]    0.4: 1
   4.1     M [next]    0.7: 2
   4.6     M [next]    1.1: 95
   4.8     M [cmpl]    1.4: fin

   5.4     M New subscription on stream 274466125
   5.6     M [next]    0.2: 29
   6.0     M [next]    0.6: 29
   6.2     M [next]    0.8: 15
   6.3     M [cmpl]    0.9: fin

In [6]:
rst(O.from_callback)
# in my words: In the on_next of the subscriber you'll have the original arguments,
# potentially objects, e.g. user original http requests.
# i.e. you could merge those with the result stream of a backend call to
# a webservice or db and send the request.response back to the user then.

def g(f, a, b):
    f(a, b)
    log('called f')
stream = O.from_callback(lambda a, b, f: g(f, a, b))('fu', 'bar')
d = subs(stream.delay(200))
# d = subs(stream.delay(200)) # does NOT work



========== from_callback ==========

module rx.linq.observable.fromcallback
@extensionclassmethod(Observable)
def from_callback(cls, func, mapper=None):
    Converts a callback function to an observable sequence.

    Keyword arguments:
    func -- {Function} Function with a callback as the last parameter to
        convert to an Observable sequence.
    mapper -- {Function} [Optional] A mapper which takes the arguments
        from the callback to produce a single item to yield on next.

    Returns {Function} A function, when executed with the required
    parameters minus the callback, produces an Observable sequence with a
    single value of the arguments to the callback as a list.
--------------------------------------------------------------------------------

   4.6     M New subscription on stream 272024249
   5.9     M called f

...after a specified delay: timer


In [7]:
rst()
# start a stream of 0, 1, 2, .. after 200 ms, with a delay of 100 ms:
stream = O.timer(200, 100).time_interval()\
    .map(lambda x: 'val:%s dt:%s' % (x.value, x.interval))\
    .take(3)
d = subs(stream, name='observer1')
# intermix directly with another one
d = subs(stream, name='observer2')


   0.8     M New subscription on stream 274470005

   3.4     M New subscription on stream 274470005

...that emits a sequence of items repeatedly: repeat


In [8]:
rst(O.repeat)
# repeat is over *values*, not function calls. Use generate or create for function calls!
subs(O.repeat({'rand': time.time()}, 3))

header('do while:')
l = []
def condition(x):
    l.append(1)
    return True if len(l) < 2 else False
stream = O.just(42).do_while(condition)
d = subs(stream)



========== repeat ==========

module rx.linq.observable.repeat
@extensionclassmethod(Observable)
def repeat(cls, value=None, repeat_count=None, scheduler=None):
    Generates an observable sequence that repeats the given element the
    specified number of times, using the specified scheduler to send out
    observer messages.

    1 - res = rx.Observable.repeat(42)
    2 - res = rx.Observable.repeat(42, 4)
    3 - res = rx.Observable.repeat(42, 4, Rx.Scheduler.timeout)
    4 - res = rx.Observable.repeat(42, None, Rx.Scheduler.timeout)

    Keyword arguments:
    value -- Element to repeat.
    repeat_count -- [Optional] Number of times to repeat the element. If not
        specified, repeats indefinitely.
    scheduler -- Scheduler to run the producer loop on. If not specified,
        defaults to ImmediateScheduler.

    Returns an observable sequence that repeats the given element the
    specified number of times.
--------------------------------------------------------------------------------

   2.0     M New subscription on stream 274473961
   2.9     M [next]    0.9: {'rand': 1482335562.344726}
   4.5     M [next]    2.4: {'rand': 1482335562.344726}
   5.1     M [next]    3.0: {'rand': 1482335562.344726}
   5.2     M [cmpl]    3.1: fin


========== do while: ==========


   6.8     M New subscription on stream 273460681
   7.5     M [next]    0.5: 42
   8.7     M [next]    1.7: 42
   9.2     M [cmpl]    2.2: fin

...from scratch, with custom logic and cleanup (calling a function again and again): create


In [9]:
rx = O.create
rst(rx)

def f(obs):
    # this function is called for every observer
    obs.on_next(rand())
    obs.on_next(rand())
    obs.on_completed()
    def cleanup():
        log('cleaning up...')
    return cleanup
stream = O.create(f).delay(200) # the delay causes the cleanup called before the subs gets the vals
d = subs(stream)
d = subs(stream)




sleep(0.5)
rst(title='Exceptions are handled nicely')
l = []
def excepting_f(obs):
    for i in range(3):
        l.append(1)
        obs.on_next('%s %s (observer hash: %s)' % (i, 1. / (3 - len(l)), hash(obs) ))
    obs.on_completed()

stream = O.create(excepting_f)
d = subs(stream)
d = subs(stream)




rst(title='Feature or Bug?')
print('(where are the first two values?)')
l = []
def excepting_f(obs):
    for i in range(3):
        l.append(1)
        obs.on_next('%s %s (observer hash: %s)' % (i, 1. / (3 - len(l)), hash(obs) ))
    obs.on_completed()

stream = O.create(excepting_f).delay(100)
d = subs(stream)
d = subs(stream)
# I think its an (amazing) feature, preventing to process functions results of later(!) failing functions



========== create ==========

module rx.linq.observable.create
@extensionclassmethod(Observable, alias="create")
def create(cls, subscribe):
    n.a.
--------------------------------------------------------------------------------

   2.4     M New subscription on stream 273454757
   3.9     M cleaning up...

   4.5     M New subscription on stream 273454757
   5.8     M cleaning up...
 131.3    T6 [next]  202.3: ['fu', 'bar']
 131.7    T6 [cmpl]  202.7: fin
 142.0    T7 [next]  202.4: val:0 dt:0:00:00.202066 (observer1)
 144.0    T8 [next]  201.8: val:0 dt:0:00:00.201505 (observer2)
 208.2    T9 [next]  205.7: 59
 208.8    T9 [next]  206.3: 68
 209.2    T9 [cmpl]  206.7: fin
 209.6   T10 [next]  204.9: 84
 210.0   T10 [next]  205.3: 79
 210.2   T10 [cmpl]  205.4: fin
 246.3   T12 [next]  304.1: val:1 dt:0:00:00.102253 (observer2)
 247.0   T11 [next]  307.4: val:1 dt:0:00:00.104979 (observer1)
 345.7   T14 [next]  406.1: val:2 dt:0:00:00.098724 (observer1)
 346.0   T14 [cmpl]  406.4: fin (observer1)
 348.3   T13 [next]  406.2: val:2 dt:0:00:00.102073 (observer2)
 348.5   T13 [cmpl]  406.3: fin (observer2)
MainThread:*** Exception: float division by zero
   0.6     M New subscription on stream 274475781
   1.2     M [next]    0.3: 0 0.5 (observer hash: 272024249)
   1.6     M [next]    0.7: 1 1.0 (observer hash: 272024249)
   2.2     M [err ]    1.3: float division by zero

   2.9     M New subscription on stream 274475781
   3.2     M [next]    0.2: 0 -1.0 (observer hash: 272024253)
   3.5     M [next]    0.5: 1 -0.5 (observer hash: 272024253)
   4.0     M [next]    1.0: 2 -0.333333333333 (observer hash: 272024253)
   4.1     M [cmpl]    1.0: fin
(where are the first two values?)

   0.7     M New subscription on stream 273460701
   3.8     M [err ]    2.9: float division by zero

   4.4     M New subscription on stream 273460701

In [10]:
rx = O.generate
rst(rx)
"""The basic form of generate takes four parameters:

the first item to emit
a function to test an item to determine whether to emit it (true) or terminate the Observable (false)
a function to generate the next item to test and emit based on the value of the previous item
a function to transform items before emitting them
"""
def generator_based_on_previous(x): return x + 1.1
def doubler(x): return 2 * x
d = subs(rx(0, lambda x: x < 4, generator_based_on_previous, doubler))



========== generate ==========

module rx.linq.observable.generate
@extensionclassmethod(Observable)
def generate(cls, initial_state, condition, iterate, result_mapper, scheduler=None):
    Generates an observable sequence by running a state-driven loop
    producing the sequence's elements, using the specified scheduler to
    send out observer messages.

    1 - res = rx.Observable.generate(0,
        lambda x: x < 10,
        lambda x: x + 1,
        lambda x: x)
    2 - res = rx.Observable.generate(0,
        lambda x: x < 10,
        lambda x: x + 1,
        lambda x: x,
        Rx.Scheduler.timeout)

    Keyword arguments:
    initial_state -- Initial state.
    condition -- Condition to terminate generation (upon returning False).
    iterate -- Iteration step function.
    result_mapper -- Selector function for results produced in the
        sequence.
    scheduler -- [Optional] Scheduler on which to run the generator loop.
        If not provided, defaults to CurrentThreadScheduler.

    Returns the generated sequence.
--------------------------------------------------------------------------------

   4.8     M New subscription on stream 274475993
   5.7     M [next]    0.7: 0
   6.4     M [next]    1.4: 2.2
   6.6     M [next]    1.6: 4.4
   7.1     M [next]    2.1: 6.6
   7.3     M [cmpl]    2.3: fin

In [11]:
rx = O.generate_with_relative_time
rst(rx)
stream = rx(1, lambda x: x < 4, lambda x: x + 1, lambda x: x, lambda t: 100)
d = subs(stream)



========== generate_with_relative_time ==========

module rx.linq.observable.generatewithrelativetime
@extensionclassmethod(Observable)
def generate_with_relative_time(cls, initial_state, condition, iterate,
    Generates an observable sequence by iterating a state from an
    initial state until the condition fails.

    res = source.generate_with_relative_time(0,
        lambda x: True,
        lambda x: x + 1,
        lambda x: x,
        lambda x: 500)

    initial_state -- Initial state.
    condition -- Condition to terminate generation (upon returning false).
    iterate -- Iteration step function.
    result_mapper -- Selector function for results produced in the
        sequence.
    time_mapper -- Time mapper function to control the speed of values
        being produced each iteration, returning integer values denoting
        milliseconds.
    scheduler -- [Optional] Scheduler on which to run the generator loop.
        If not specified, the timeout scheduler is used.

    Returns the generated sequence.
--------------------------------------------------------------------------------

   4.7     M New subscription on stream 274475933

...for each observer that subscribes OR according to a condition at subscription time: defer / if_then


In [12]:
rst(O.defer)
# plural! (unique per subscription)
streams = O.defer(lambda: O.just(rand()))
d = subs(streams)
d = subs(streams) # gets other values - created by subscription!



========== defer ==========

module rx.linq.observable.defer
@extensionclassmethod(Observable)
def defer(cls, observable_factory):
    Returns an observable sequence that invokes the specified factory
    function whenever a new observer subscribes.

    Example:
    1 - res = rx.Observable.defer(lambda: rx.Observable.from_([1,2,3]))

    Keyword arguments:
    :param types.FunctionType observable_factory: Observable factory function
        to invoke for each observer that subscribes to the resulting sequence.

    :returns: An observable sequence whose observers trigger an invocation
    of the given observable factory function.
    :rtype: Observable
--------------------------------------------------------------------------------

   2.7     M New subscription on stream 274475969
   3.4     M [next]    0.6: 38
   3.5     M [cmpl]    0.7: fin

   4.4     M New subscription on stream 274475969
   4.9     M [next]    0.4: 77
   5.2     M [cmpl]    0.7: fin

In [13]:
# evaluating a condition at subscription time in order to decide which of two streams to take.
rst(O.if_then)
cond = True
def should_run():
    return cond
streams = O.if_then(should_run, O.return_value(43), O.return_value(56))
d = subs(streams)

log('condition will now evaluate falsy:')
cond = False
streams = O.if_then(should_run, O.return_value(43), O.return_value(rand()))
d = subs(streams)
d = subs(streams)



========== if_then ==========

module rx.linq.observable.ifthen
@extensionclassmethod(Observable)
def if_then(cls, condition, then_source, else_source=None, scheduler=None):
    Determines whether an observable collection contains values.

    Example:
    1 - res = rx.Observable.if(condition, obs1)
    2 - res = rx.Observable.if(condition, obs1, obs2)
    3 - res = rx.Observable.if(condition, obs1, scheduler=scheduler)

    Keyword parameters:
    condition -- {Function} The condition which determines if the
        then_source or else_source will be run.
    then_source -- {Observable} The observable sequence or Promise that
        will be run if the condition function returns true.
    else_source -- {Observable} [Optional] The observable sequence or
        Promise that will be run if the condition function returns False.
        If this is not provided, it defaults to rx.empty
    scheduler -- [Optional] Scheduler to use.

    Returns an observable {Observable} sequence which is either the
    then_source or else_source.
--------------------------------------------------------------------------------

   3.3     M New subscription on stream 274480673
   3.6     M [next]    0.2: 43
   3.8     M [cmpl]    0.5: fin
   4.0     M condition will now evaluate falsy:

   4.4     M New subscription on stream 274480817
   4.6     M [next]    0.2: 52
   4.7     M [cmpl]    0.3: fin

   5.2     M New subscription on stream 274480817
   5.6     M [next]    0.2: 52
   5.8     M [cmpl]    0.4: fin

...that emits a sequence of integers: range


In [14]:
rst(O.range)
d = subs(O.range(0, 3))



========== range ==========

module rx.linq.observable.range
@extensionclassmethod(Observable)
def range(cls, start, count, scheduler=None):
    Generates an observable sequence of integral numbers within a
    specified range, using the specified scheduler to send out observer
    messages.

    1 - res = Rx.Observable.range(0, 10)
    2 - res = Rx.Observable.range(0, 10, rx.Scheduler.timeout)

    Keyword arguments:
    start -- The value of the first integer in the sequence.
    count -- The number of sequential integers to generate.
    scheduler -- [Optional] Scheduler to run the generator loop on. If not
        specified, defaults to Scheduler.current_thread.

    Returns an observable sequence that contains a range of sequential
    integral numbers.
--------------------------------------------------------------------------------

   2.9     M New subscription on stream 274475905
   3.7     M [next]    0.4: 0
   4.3     M [next]    1.0: 1
   4.6     M [next]    1.3: 2
   4.9     M [cmpl]    1.6: fin

...at particular intervals of time: interval

(you can .publish() it to get an easy "hot" observable)


In [5]:
rst(O.interval)
d = subs(O.interval(100).time_interval()\
         .map(lambda x, v: '%(interval)s %(value)s' \
              % ItemGetter(x)).take(3))



========== interval ==========

module rx.linq.observable.interval
@extensionclassmethod(Observable)
def interval(cls, period, scheduler=None):
    Returns an observable sequence that produces a value after each
    period.

    Example:
    1 - res = rx.Observable.interval(1000)
    2 - res = rx.Observable.interval(1000, rx.Scheduler.timeout)

    Keyword arguments:
    period -- Period for producing the values in the resulting sequence
        (specified as an integer denoting milliseconds).
    scheduler -- [Optional] Scheduler to run the timer on. If not specified,
        rx.Scheduler.timeout is used.

    Returns an observable sequence that produces a value after each period.
--------------------------------------------------------------------------------

   1.2     M New subscription (14365) on stream 276610125
 102.3    T8 [next]  100.9: 0:00:00.100623 0 -> 14365
 208.2    T9 [next]  206.9: 0:00:00.105960 1 -> 14365
 310.8   T10 [next]  309.5: 0:00:00.102625 2 -> 14365
 311.1   T10 [cmpl]  309.8: fin -> 14365

...after a specified delay (see timer)

...that completes without emitting items: empty


In [16]:
rst(O.empty)
d = subs(O.empty())



========== empty ==========

module rx.linq.observable.empty
@extensionclassmethod(Observable)
def empty(cls, scheduler=None):
    Returns an empty observable sequence, using the specified scheduler
    to send out the single OnCompleted message.

    1 - res = rx.empty()
    2 - res = rx.empty(rx.Scheduler.timeout)

    scheduler -- Scheduler to send the termination call on.

    Returns an observable sequence with no elements.
--------------------------------------------------------------------------------

   2.9     M New subscription on stream 273460593
   3.2     M [cmpl]    0.2: fin

...that does nothing at all: never


In [17]:
rst(O.never)
d = subs(O.never())



========== never ==========

   0.7   T18 [next]  104.4: 0 -1.0 (observer hash: 274473797)
   1.1   T18 [next]  104.8: 1 -0.5 (observer hash: 274473797)module rx.linq.observable.never
@extensionclassmethod(Observable)
def never(cls):
    Returns a non-terminating observable sequence, which can be used to
    denote an infinite duration (e.g. when using reactive joins).

    Returns an observable sequence whose observers will never get called.
--------------------------------------------------------------------------------

   2.0   T18 [next]  105.7: 2 -0.333333333333 (observer hash: 274473797)

   2.1   T18 [cmpl]  105.9: fin   2.7     M New subscription on stream 274473849

...that excepts: throw


In [7]:
rst(O.on_error)
d = subs(O.on_error(ZeroDivisionError))



========== throw ==========

module rx.linq.observable.throw
@extensionclassmethod(Observable, alias="throw_exception")
def on_error(cls, exception, scheduler=None):
    Returns an observable sequence that terminates with an exception,
    using the specified scheduler to send out the single OnError message.

    1 - res = rx.throw(Exception('Error'))
    2 - res = rx.throw(Exception('Error'),
                                            rx.Scheduler.timeout)

    Keyword arguments:
    exception -- An object used for the sequence's termination.
    scheduler -- Scheduler to send the exceptional termination call on. If
        not specified, defaults to ImmediateScheduler.

    Returns the observable sequence that terminates exceptionally with the
    specified exception object.
--------------------------------------------------------------------------------

   1.8     M New subscription (23467) on stream 276521733
   2.0     M [err ]    0.2: <type 'exceptions.ZeroDivisionError'> -> 23467

In [ ]: