In [2]:
%run startup.py

In [3]:
%%javascript
$.getScript('./assets/js/ipython_notebook_toc.js')


A Decision Tree of Observable Operators

Part 2: Combining Observables

source: http://reactivex.io/documentation/operators.html#tree.
(transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, axiros)

This tree can help you find the ReactiveX Observable operator you’re looking for.
See Part 1 for Usage and Output Instructions.
We also require acquaintance with the marble diagrams feature of RxPy.

This is a helpful accompanying read.

Table of Contents

I want to create an Observable by combining other Observables

... and emitting all of the items from all of the Observables in whatever order they are received: merge / merge_all


In [6]:
reset_start_time(O.merge)
l = []
def excepting_f(obs):
    for i in range(10):
        l.append(1)
        obs.on_next(1 / (3 - len(l)))
stream1 = O.from_(('a', 'b', 'c'))
stream2 = O.create(excepting_f)
# merged stream stops in any case at first exception!
# No guarantee of order of those immediately created streams !
d = subs(stream1.merge(stream2))
l = []
d = subs(O.merge(new_thread_scheduler, [stream1, stream2]))



========== merge ==========

module rx.linq.observable.merge
@extensionclassmethod(Observable)  # noqa
def merge(cls, *args):
    Merges all the observable sequences into a single observable
    sequence. The scheduler is optional and if not specified, the
    immediate scheduler is used.

    1 - merged = rx.Observable.merge(xs, ys, zs)
    2 - merged = rx.Observable.merge([xs, ys, zs])
    3 - merged = rx.Observable.merge(scheduler, xs, ys, zs)
    4 - merged = rx.Observable.merge(scheduler, [xs, ys, zs])

    Returns the observable sequence that merges the elements of the
    observable sequences.
--------------------------------------------------------------------------------

   2.3     M New subscription on stream 276542077
   3.0     M [next]    0.5: 0
   3.2     M [next]    0.7: 1
   3.4     M [err ]    0.9: integer division or modulo by zero
   5.4    T5 [next]    1.1: a

   5.9    T5 [next]    1.6: b   4.2     M New subscription on stream 276542101

   6.0    T5 [next]    1.7: c
   6.5    T5 [next]    2.2: 0
   6.7    T5 [next]    2.4: 1
   6.8    T5 [err ]    2.5: integer division or modulo by zero

In [167]:
rst(O.merge_all, title='merge_all')
meta = O.repeat(O.from_((1, 2, 3)), 3)
# no guarantee of order, immediatelly created:
d = subs(meta.merge_all())
# Introducing delta ts:
d = subs(O.repeat(O.timer(10, 10)\
                  .take(3), 3)\
                  .merge_all(),
         name='streams with time delays between events')



========== merge_all ==========

function merge_all of module rx.linq.observable.merge:
Merges an observable sequence of observable sequences into an
    observable sequence.

    Returns the observable sequence that merges the elements of the inner
    sequences.
    
--------------------------------------------------------------------------------

   1.0     M New subscription on stream 276411209
   2.5     M [next]    1.2: 1
   3.6     M [next]    2.4: 2
   3.9     M [next]    2.6: 1
   4.5     M [next]    3.2: 3
   4.9     M [next]    3.6: 2
   5.0     M [next]    3.8: 1
   5.6     M [next]    4.4: 3
   5.9     M [next]    4.6: 2
   6.5     M [next]    5.2: 3
   6.8     M [cmpl]    5.5: fin

   7.3     M New subscription on stream 276483801
  19.0 T1046 [next]   11.7: 0 (streams with time delays between events)
  19.9 T1047 [next]   12.6: 0 (streams with time delays between events)
  21.1 T1048 [next]   13.8: 0 (streams with time delays between events)
  30.2 T1049 [next]   22.9: 1 (streams with time delays between events)
  31.6 T1050 [next]   24.4: 1 (streams with time delays between events)
  32.4 T1051 [next]   25.1: 1 (streams with time delays between events)
  41.7 T1052 [next]   34.4: 2 (streams with time delays between events)
  42.4 T1053 [next]   35.1: 2 (streams with time delays between events)
  43.3 T1054 [next]   36.0: 2 (streams with time delays between events)
  43.8 T1054 [cmpl]   36.5: fin (streams with time delays between events)

... and emitting all of the items from all of the Observables, one Observable at a time: concat


In [168]:
rst(O.concat)
s1 = O.from_((1, 2))
s2 = O.from_((3, 4))
# while normal subscriptions work as expected...
d1, d2 = subs(s1), subs(s2)
# ... another one can have the order reversed
d = subs(O.concat([s2, s1]))


function concat of module rx.linq.observable.concat:
Concatenates all the observable sequences.

    1 - res = Observable.concat(xs, ys, zs)
    2 - res = Observable.concat([xs, ys, zs])

    Returns an observable sequence that contains the elements of each given
    sequence, in sequential order.
    
--------------------------------------------------------------------------------

   0.9     M New subscription on stream 278620693
   1.4     M [next]    0.3: 1
   1.6     M [next]    0.6: 2
   1.8     M [cmpl]    0.8: fin

   2.2     M New subscription on stream 278620893
   2.7     M [next]    0.4: 3
   2.8     M [next]    0.6: 4
   3.0     M [cmpl]    0.7: fin

   3.4     M New subscription on stream 276406121
   3.8     M [next]    0.3: 3
   4.1     M [next]    0.5: 4
   4.4     M [next]    0.8: 1
   4.6     M [next]    1.1: 2
   5.0     M [cmpl]    1.5: fin

In [169]:
rst()
# See the marbles notebook:
s1 = O.from_marbles('1--2---3|').to_blocking()
s2 = O.from_marbles('--a-b-c|' ).to_blocking()
d = (subs(s1, name='A'),
     subs(s2, name='B'))
rst(title="Concatenating in reverse order", sleep=1)
d = subs(O.concat([s2, s1]), name='C')


   0.9     M New subscription on stream 276485909

   3.8     M New subscription on stream 276485905
   5.8     M main thread sleeping 1s
  12.0 T1058 [next]   10.9: 1 (A)
 219.2 T1066 [next]  215.4: a (B)
 226.2 T1059 [next]  225.1: 2 (A)
 325.2 T1068 [next]  321.4: b (B)
 439.1 T1069 [next]  435.3: c (B)
 446.4 T1070 [cmpl]  442.5: fin (B)
 536.3 T1061 [next]  535.2: 3 (A)
 547.5 T1063 [cmpl]  546.3: fin (A)


========== Concatenating in reverse order ==========


   0.5     M New subscription on stream 276308601
 216.5 T1074 [next]  215.7: a (C)
 327.7 T1075 [next]  326.9: b (C)
 436.7 T1077 [next]  436.0: c (C)
 456.4 T1082 [next]  455.7: 1 (C)
 670.3 T1083 [next]  669.5: 2 (C)
 976.9 T1085 [next]  976.2: 3 (C)
 990.9 T1087 [cmpl]  990.1: fin (C)

... by combining the items from two or more Observables sequentially to come up with new items to emit

... whenever each of the Observables has emitted a new item zip / zip_list


In [176]:
rst(O.zip)
s1 = O.range(0, 5)
d = subs(O.zip(s1, s1.skip(1), s1.skip(2), lambda s1, s2, s3: '%s : %s : %s' % (s1, s2, s3)))


function zip of module rx.linq.observable.zip:
Merges the specified observable sequences into one observable
    sequence by using the mapper function whenever all of the observable
    sequences have produced an element at a corresponding index.

    The last element in the arguments must be a function to invoke for each
    series of elements at corresponding indexes in the sources.

    Arguments:
    args -- Observable sources.

    Returns an observable {Observable} sequence containing the result of
    combining elements of the sources using the specified result mapper
    function.
    
--------------------------------------------------------------------------------

   0.8     M New subscription on stream 276463149
   3.0     M [next]    2.2: 0 : 1 : 2
   3.5     M [next]    2.6: 1 : 2 : 3
   3.9     M [next]    3.0: 2 : 3 : 4
   4.3     M [cmpl]    3.4: fin

In [179]:
rst(O.zip_list) # alias: zip_array
s1 = O.range(0, 5)
d = subs(O.zip_list(s1, s1.skip(1), s1.skip(2)))


function zip_list of module rx.linq.observable.ziparray:
Merge the specified observable sequences into one observable
    sequence by emitting a list with the elements of the observable
    sequences at corresponding indexes.

    Keyword arguments:
    :param Observable cls: Class
    :param Tuple args: Observable sources.

    :return: Returns an observable sequence containing lists of
    elements at corresponding indexes.
    :rtype: Observable
    
--------------------------------------------------------------------------------

   0.9     M New subscription on stream 276480037
   2.9     M [next]    1.8: [0, 1, 2]
   3.3     M [next]    2.2: [1, 2, 3]
   3.9     M [next]    2.8: [2, 3, 4]
   4.4     M [cmpl]    3.3: fin

... whenever any of the Observables has emitted a new item combine_latest


In [17]:
rst(O.combine_latest, title='combine_latest')
s1 = O.interval(100).map(lambda i: 'First : %s' % i)
s2 = O.interval(150).map(lambda i: 'Second: %s' % i)
# the start is interesting, both must have emitted, so it starts at 150ms with 0/0:
d = subs(s1.combine_latest(s2, lambda s1, s2: '%s, %s' % (s1, s2)).take(6))

rst(title='For comparison: merge', sleep=1)
d = subs(s1.merge(s2).take(6))



========== combine_latest ==========

function combine_latest of module rx.linq.observable.combinelatest:
Merges the specified observable sequences into one observable
    sequence by using the mapper function whenever any of the
    observable sequences produces an element.

    1 - obs = Observable.combine_latest(obs1, obs2, obs3,
                                       lambda o1, o2, o3: o1 + o2 + o3)
    2 - obs = Observable.combine_latest([obs1, obs2, obs3],
                                        lambda o1, o2, o3: o1 + o2 + o3)

    Returns an observable sequence containing the result of combining
    elements of the sources using the specified result mapper
    function.
    
--------------------------------------------------------------------------------

   1.0     M New subscription on stream 276805405
   3.0     M main thread sleeping 1s
 157.1  T195 [next]  156.1: First : 0, Second: 0
 211.6  T196 [next]  210.6: First : 1, Second: 0
 312.5  T197 [next]  311.6: First : 1, Second: 1
 316.0  T198 [next]  315.0: First : 2, Second: 1
 421.2  T200 [next]  420.2: First : 3, Second: 1
 465.7  T199 [next]  464.7: First : 3, Second: 2
 465.9  T199 [cmpl]  464.9: fin


========== For comparison: merge ==========


   1.0     M New subscription on stream 276157517
 104.0  T204 [next]  103.0: First : 0
 157.6  T203 [next]  156.5: Second: 0
 207.3  T205 [next]  206.2: First : 1
 311.4  T207 [next]  310.4: First : 2 311.7  T206 [next]  310.7: Second: 1

 417.5  T208 [next]  416.5: First : 3
 417.8  T208 [cmpl]  416.7: fin

... whenever the first of the Observables has emitted a new item with_latest_from


In [16]:
rst(O.with_latest_from, title='with_latest_from')
s1 = O.interval(140).map(lambda i: 'First : %s' % i)
s2 = O.interval(50) .map(lambda i: 'Second: %s' % i)
d = subs(s1.with_latest_from(s2, lambda s1, s2: '%s, %s' % (s1, s2)).take(6))



========== with_latest_from ==========

function with_latest_from of module rx.linq.observable.withlatestfrom:
Merges the specified observable sequences into one observable sequence
    by using the mapper function only when the first observable sequence
    produces an element. The observables can be passed either as seperate
    arguments or as a list.

    1 - obs = Observable.with_latest_from(obs1, obs2, obs3,
                                       lambda o1, o2, o3: o1 + o2 + o3)
    2 - obs = Observable.with_latest_from([obs1, obs2, obs3],
                                        lambda o1, o2, o3: o1 + o2 + o3)

    Returns an observable sequence containing the result of combining
    elements of the sources using the specified result mapper function.
    
--------------------------------------------------------------------------------

   1.0     M New subscription on stream 276799777
 147.0  T170 [next]  146.0: First : 0, Second: 1
 292.0  T174 [next]  290.9: First : 1, Second: 4
 437.8  T178 [next]  436.7: First : 2, Second: 7
 581.7  T182 [next]  580.7: First : 3, Second: 9
 723.3  T185 [next]  722.2: First : 4, Second: 12
 867.0  T189 [next]  865.9: First : 5, Second: 15
 867.4  T189 [cmpl]  866.3: fin

... whenever an item is emitted by one Observable in a window defined by an item emitted by another join

The join operator takes four parameters:

  1. the second Observable to combine with the source Observable
  2. a function that accepts an item from the source Observable and returns an Observable whose lifespan governs the duration during which that item will combine with items from the second Observable
  3. a function that accepts an item from the second Observable and returns an Observable whose lifespan governs the duration during which that item will combine with items from the first Observable
  4. a function that accepts an item from the first Observable and an item from the second Observable and returns an item to be emitted by the Observable returned from join

In [23]:
rst(O.join)
# this one is pretty timing critical and output seems swallowed with 2 threads (over)writing.
# better try this with timer(0) on the console. Also the scheduler of the timers is critical,
# try other O.timer schedulers...
xs = O.interval(100).map(lambda i: 'First : %s' % i)
ys = O.interval(101).map(lambda i: 'Second: %s' % i)
d = subs(xs.join(ys, lambda _: O.timer(10), lambda _: O.timer(0), lambda x, y: '%s %s' % (x, y)).take(5))


function join of module rx.linq.observable.join:
Correlates the elements of two sequences based on overlapping
    durations.

    Keyword arguments:
    right -- The right observable sequence to join elements for.
    left_duration_mapper -- A function to select the duration (expressed
        as an observable sequence) of each element of the left observable
        sequence, used to determine overlap.
    right_duration_mapper -- A function to select the duration (expressed
        as an observable sequence) of each element of the right observable
        sequence, used to determine overlap.
    result_mapper -- A function invoked to compute a result element for
        any two overlapping elements of the left and right observable
        sequences. The parameters passed to the function correspond with
        the elements from the left and right source sequences for which
        overlap occurs.

    Return an observable sequence that contains result elements computed
    from source elements that have an overlapping duration.
    
--------------------------------------------------------------------------------

   1.2     M New subscription on stream 275960897
 109.1  T449 [next]  107.8: First : 0 Second: 0
 215.3  T453 [next]  214.0: First : 1 Second: 1
 321.6  T457 [next]  320.4: First : 2 Second: 2
 426.0  T461 [next]  424.8: First : 3 Second: 3
 533.3  T465 [next]  532.1: First : 4 Second: 4
 533.7  T465 [cmpl]  532.4: fin

... or, alternatively, group_join

The groupJoin operator takes four parameters:

  1. the second Observable to combine with the source Observable
  2. a function that accepts an item from the source Observable and returns an Observable whose lifespan governs the duration during which that item will combine with items from the second Observable
  3. a function that accepts an item from the second Observable and returns an Observable whose lifespan governs the duration during which that item will combine with items from the first Observable
  4. a function that accepts an item from the first Observable and an Observable that emits items from the second Observable and returns an item to be emitted by the Observable returned from groupJoin

In [27]:
rst(O.group_join, title='group_join')
xs = O.interval(100).map(lambda i: 'First : %s' % i)
ys = O.interval(100).map(lambda i: 'Second: %s' % i)
d = subs(xs.group_join(ys,
                       lambda _: O.timer(0),
                       lambda _: O.timer(0),
                       lambda x, yy: yy.select(lambda y: '%s %s' % (x, y))).merge_all().take(5))



========== group_join ==========

function group_join of module rx.linq.observable.groupjoin:
Correlates the elements of two sequences based on overlapping
    durations, and groups the results.

    Keyword arguments:
    right -- The right observable sequence to join elements for.
    left_duration_mapper -- A function to select the duration (expressed
        as an observable sequence) of each element of the left observable
        sequence, used to determine overlap.
    right_duration_mapper -- A function to select the duration (expressed
        as an observable sequence) of each element of the right observable
        sequence, used to determine overlap.
    result_mapper -- A function invoked to compute a result element for
        any element of the left sequence with overlapping elements from the
        right observable sequence. The first parameter passed to the
        function is an element of the left sequence. The second parameter
        passed to the function is an observable sequence with elements from
        the right sequence that overlap with the left sequence's element.

    Returns an observable sequence that contains result elements computed
    from source elements that have an overlapping duration.
    
--------------------------------------------------------------------------------

   1.3     M New subscription on stream 273932389
 534.5  T500 [next]  533.0: First : 4 Second: 4
 536.0  T501 [next]  534.5: First : 4 Second: 4
 640.2  T504 [next]  638.8: First : 5 Second: 5
18690.9 T1191 [next] 18689.5: First : 176 Second: 177
19010.8 T1202 [next] 19009.4: First : 179 Second: 180
19011.9 T1202 [cmpl] 19010.5: fin

... by means of Pattern and Plan intermediaries And/Then/When, and / then / when

The combination of the And, Then, and When operators behave much like the Zip operator, but they do so by means of intermediate data structures. And accepts two or more Observables and combines the emissions from each, one set at a time, into Pattern objects. Then operates on such Pattern objects, transforming them in a Plan. When in turn transforms these various Plan objects into emissions from an Observable.

details The And/Then/When trio has more overloads that enable you to group an even greater number of sequences. They also allow you to provide more than one 'plan' (the output of the Then method). This gives you the Merge feature but on the collection of 'plans'. I would suggest playing around with them if this functionality is of interest to you. The verbosity of enumerating all of the combinations of these methods would be of low value. You will get far more value out of using them and discovering for yourself.

As we delve deeper into the depths of what the Rx libraries provide us, we can see more practical usages for it. Composing sequences with Rx allows us to easily make sense of the multiple data sources a problem domain is exposed to. We can concatenate values or sequences together sequentially with StartWith, Concat and Repeat. We can process multiple sequences concurrently with Merge, or process a single sequence at a time with Amb and Switch. Pairing values with CombineLatest, Zip and the And/Then/When operators can simplify otherwise fiddly operations like our drag-and-drop examples and monitoring system status.


In [59]:
rst()
# see the similarity to zip. 
ts = time.time()
def _dt():
    # giving us info when an element was created:
    return 'from time: %.2f' % (time.time() - ts)
one = O.interval(1000) .map(lambda i: 'Seconds : %s %s' % (i, _dt())).take(5)
two = O.interval(500)  .map(lambda i: 'HalfSecs: %s %s' % (i, _dt())).take(5)
three = O.interval(100).map(lambda i: '10thS   : %s %s' % (i, _dt())).take(5)

z = O.when(
   one          \
    .and_(two)  \
    .and_(three)\
    .then_do(lambda a, b, c: '\n'.join(('', '', a, b, c))))

# from the output you see that the result stream consists of elements built at each interval
# (which is in the past for 'two' and 'three'),
# buffered until the 1 second sequence 'one' advances a step. 
d = subs(z)


   1.1     M New subscription on stream 275998165
1002.9 T1468 [next] 1001.8: 

Seconds : 0 from time: 1.00
HalfSecs: 0 from time: 0.50
10thS   : 0 from time: 0.11
2008.3 T1477 [next] 2007.2: 

Seconds : 1 from time: 2.01
HalfSecs: 1 from time: 1.01
10thS   : 1 from time: 0.21
3011.7 T1480 [next] 3010.6: 

Seconds : 2 from time: 3.01
HalfSecs: 2 from time: 1.51
10thS   : 2 from time: 0.32
4014.4 T1483 [next] 4013.3: 

Seconds : 3 from time: 4.01
HalfSecs: 3 from time: 2.01
10thS   : 3 from time: 0.42
5018.9 T1484 [next] 5017.8: 

Seconds : 4 from time: 5.02
HalfSecs: 4 from time: 2.52
10thS   : 4 from time: 0.52
5019.9 T1484 [cmpl] 5018.8: fin

... and emitting the items from only the most-recently emitted of those Observables switch_latest


In [71]:
rst(O.switch_latest)
s = O.range(0, 3).select(lambda x: O.range(x, 3)\
                                  # showing from which stream our current value comes:
                                  .map(lambda v: '%s (from stream nr %s)' % (v, x)))\
                 .switch_latest()
d = subs(s)


function switch_latest of module rx.linq.observable.switchlatest:
Transforms an observable sequence of observable sequences into an
    observable sequence producing values only from the most recent
    observable sequence.

    :returns: The observable sequence that at any point in time produces the
    elements of the most recent inner observable sequence that has been
    received.
    :rtype: Observable
    
--------------------------------------------------------------------------------

   0.9     M New subscription on stream 275998049
   1.8     M [next]    0.8: 0 (from stream nr 0)
   2.9     M [next]    1.9: 1 (from stream nr 1)
   3.5     M [next]    2.5: 2 (from stream nr 2)
   3.9     M [next]    2.9: 3 (from stream nr 2)
   4.0     M [next]    3.0: 4 (from stream nr 2)
   4.2     M [cmpl]    3.2: fin

In [ ]: