In [4]:
%run startup.py
In [5]:
%%javascript
$.getScript('./assets/js/ipython_notebook_toc.js')
source: http://reactivex.io/documentation/operators.html#tree.
(transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, axiros)
This tree can help you find the ReactiveX Observable operator you’re looking for.
See Part 1 for Usage and Output Instructions.
We also require acquaintance with the marble diagrams feature of RxPy.
In [3]:
reset_start_time(O.map, title='map') # alias is "select"
# warming up:
d = subs(O.from_((1, 2 , 3)).map(lambda x: x * 2))
In [3]:
rst(O.pluck, title='pluck')
d = subs(O.from_([{'x': 1, 'y': 2}, {'x': 3, 'y': 4}]).pluck('y'))
class Coord:
def __init__(self, x, y):
self.x = x
self.y = y
rst(title='pluck_attr')
d = subs(O.from_([Coord(1, 2), Coord(3, 4)]).pluck_attr('y'))
In [4]:
rst(O.flat_map)
stream = O.range(1, 2)\
.flat_map(lambda x: O.range(x, 2)) # alias: flat_map
d = subs(stream)
In [8]:
rst() # from an array
s1 = O.from_(('a', 'b', 'c'))
d = subs(s1.flat_map(lambda x: x))
d = subs(s1.flat_map(lambda x, i: (x, i)))
#d = subs(O.from_(('a', 'b', 'c')).flat_map(lambda x, i: '%s%s' % (x, i))) # ident, a string is iterable
header('using a result mapper')
def res_sel(*a):
# in conrast to the RxJS example I get only 3 parameters, see output
return '-'.join([str(s) for s in a])
# for every el of the original stream we get *additional* two elements: the el and its index:
d = subs(s1.flat_map(lambda x, i: (x, i) , res_sel))
# ident, flat_map flattens the inner stream:
d = subs(s1.flat_map(lambda x, i: O.from_((x, i)), res_sel))
In [29]:
rst(O.flat_map_latest) # alias: select_switch
d = subs(O.range(1, 2).flat_map_latest(lambda x: O.range(x, 2)))
# maybe better to understand: A, B, C are emitted always more recent, then the inner streams' elements
d = subs(O.from_(('A', 'B', 'C')).flat_map_latest(
lambda x, i: O.from_(('%s%s-a' % (x, i),
'%s%s-b' % (x, i),
'%s%s-c' % (x, i),
))))
# with emission delays: Now the inner's is faster:
outer = O.from_marbles('A--B--C|').to_blocking()
inner = O.from_marbles('a-b-c|').to_blocking()
# the inner .map is to show also outer's value
d = subs(outer.flat_map_latest(lambda X: inner.map(lambda x: '%s%s' % (X, x))))
In [4]:
rst(O.for_in)
abc = O.from_marbles('a-b|').to_blocking()
# abc times 3, via:
d = subs(O.for_in([1, 2, 3],
lambda i: abc.map(
# just to get the results of array and stream:
lambda letter: '%s%s' % (letter, i) )))
sleep(0.5)
# we can also for_in from an observable.
# TODO: Dont' understand the output though - __doc__ says only arrays.
d = subs(O.for_in(O.from_((1, 2, 3)),
lambda i: abc.map(lambda letter: '%s%s' % (letter, i) )).take(2))
manySelect internally transforms each item emitted by the source Observable into an Observable that emits that item and all items subsequently emitted by the source Observable, in the same order.
So, for example, it internally transforms an Observable that emits the numbers 1,2,3 into three Observables: one that emits 1,2,3, one that emits 2,3, and one that emits 3.
Then manySelect passes each of these Observables into a function that you provide, and emits, as the emissions from the Observable that manySelect returns, the return values from those function calls.
In this way, each item emitted by the resulting Observable is a function of the corresponding item in the source Observable and all of the items emitted by the source Observable after it.
In [25]:
rst(O.many_select)
stream = O.from_marbles('a-b-c|')
# TODO: more use cases
d = subs(stream.many_select(lambda x: x.first()).merge_all())
In [32]:
rst(O.scan)
s = O.from_marbles("1-2-3-4---5").to_blocking()
d = subs(s.scan(lambda x, y: int(x) + int(y), seed=10000))
In [38]:
rst(O.timestamp)
# the timestamps are objects, not dicts:
d = subs(marble_stream('a-b-c|').timestamp().pluck_attr('timestamp'))
In [44]:
rst(O.time_interval)
d = subs(marble_stream('a-b--c|').time_interval().map(lambda x: x.interval))
In [ ]: