In [4]:
%run startup.py
In [5]:
%%javascript
$.getScript('./assets/js/ipython_notebook_toc.js')
source: http://reactivex.io/documentation/operators.html#tree.
(transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, axiros)
This tree can help you find the ReactiveX Observable operator you’re looking for.
See Part 1 for Usage and Output Instructions.
We also require acquaintance with the marble diagrams feature of RxPy.
In [6]:
reset_start_time(O.delay)
d = subs(marble_stream('a-b-c|').delay(150).merge(marble_stream('1-2-3|')))
In [15]:
rst(O.materialize)
def pretty(notif):
# this are the interesting attributes:
return 'kind: %(kind)s, value: %(value)s' % ItemGetter(notif)
d = subs(O.from_((1, 2, 3)).materialize().map(lambda x: pretty(x)))
In [22]:
rst(O.dematerialize)
d = subs(O.from_((1, 2, 3)).materialize().dematerialize())
header('Dematerializing manually created notifs')
d = subs(O.from_((rx.core.notification.OnNext('foo'), rx.core.notification.OnCompleted())).dematerialize())
In [28]:
# Materializing a sequence can be very handy for performing analysis or logging of a sequence.
# You can unwrap a materialized sequence by applying the Dematerialize extension method.
from rx.testing import dump
d = subs(O.range(1, 3).materialize().dump(name='mydump'))
In [18]:
rst(O.ignore_elements)
d = subs(O.range(0, 10).ignore_elements())
In [32]:
rst(O.start_with)
d = subs(O.from_(('a', 'b')).start_with(1, 2, 3))
In [36]:
rst(O.default_if_empty)
# the default here is to emit a None:
d = subs(O.empty().default_if_empty())
d = subs(O.empty().default_if_empty('hello world'))
Very good intro is here
Buffer 'closing' means: The buffer is flushed to the subscriber(s), then next buffer is getting filled.
Note: The used scheduler seems not 100% exact timewise on the marble streams. But you get the idea.
In [7]:
rst(O.buffer)
header('with closing mapper')
# the simplest one:
print('''Returns an Observable that emits buffers of items it collects from the source Observable. The resulting Observable emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a new buffer whenever the Observable produced by the specified bufferClosingSelector emits an item.''')
xs = marble_stream('1-2-3-4-5-6-7-8-9|')
# defining when to flush the buffer to the subscribers:
cs = marble_stream('---e--e----------|')
print('\nCalling the closer as is:')
d = subs(xs.buffer(closing_mapper=cs))
sleep(2)
print('\nCalling again and again -> equal buffer sizes flushed')
cs = marble_stream('---e|')
d = subs(xs.buffer(closing_mapper=lambda: cs))
In [37]:
rst(title='with buffer closing mapper')
xs = marble_stream('1-2-3-4-5-6-7-8-9|')
# the more '-' the bigger the emitted buffers.
# Called again and again:
cs = marble_stream('------e|')
cs2 = marble_stream('--e|')
print ('Subscribing two times with different buffer sizes')
d = subs(xs.buffer(buffer_closing_mapper=lambda: cs), name='BIIIIIG bufs')
d = subs(xs.buffer(buffer_closing_mapper=lambda: cs2),name='small bufs')
In [36]:
rst(title='with buffer opening mapper')
xs = marble_stream('1-2-3-4-5-6-7-8-9|')
opens = marble_stream('---o|')
d = subs(xs.buffer(buffer_openings=lambda: opens))
In [35]:
rst(title='with buffer opening and closing mapper')
#TODO: behaviour not really understood. Bug?
xs = marble_stream('1-2-3-4-5-6-7-8-9-1-2-3-4-5-6-7-8-9-1-2-3-4-5-6-7-8-9|')
opens = marble_stream('oo---------------------------------------------------|')
closes = marble_stream('-------------------------c|')
d = subs(xs.buffer(buffer_openings=opens, buffer_closing_mapper=lambda: closes))
In [41]:
rst(O.buffer_with_count)
xs = marble_stream('1-2-3-4-5-6-7-8-9-1-2-3-4-5-6-7-8-9|')
d = subs(xs.buffer_with_count(2, skip=5))
In [44]:
rst(O.take_last_buffer)
xs = marble_stream('1-2-3-4-5|')
d = subs(xs.take_last_buffer(2))
In [46]:
rst(O.take_with_time)
xs = marble_stream('1-2-3-4-5|')
d = subs(xs.take_with_time(310))
In [47]:
rst(O.take_last_with_time)
xs = marble_stream('1-2-3-4-5|')
d = subs(xs.take_last_with_time(310))
Window is similar to Buffer, but rather than emitting packets of items from the source Observable, it emits Observables, each one of which emits a subset of items from the source Observable and then terminates with an onCompleted notification.
Like Buffer, Window has many varieties, each with its own way of subdividing the original Observable into the resulting Observable emissions, each one of which contains a “window” onto the original emitted items. In the terminology of the Window operator, when a window “opens,” this means that a new Observable is emitted and that Observable will begin emitting items emitted by the source Observable. When a window “closes,” this means that the emitted Observable stops emitting items from the source Observable and terminates with an onCompleted notification to its observers.
from: http://www.introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html#Window
A major difference we see here is that the Window operators can notify you of values from the source as soon as they are produced. The Buffer operators, on the other hand, must wait until the window closes before the values can be notified as an entire list.
In [55]:
rst(O.window_with_count, title="window with count")
wid = 0 # window id
def show_stream(window):
global wid
wid += 1
log('starting new window', wid)
# yes we can subscribe normally, its not buffers but observables:
subs(window, name='window id %s' % wid)
src = O.interval(100).take(10).window_with_count(3).map(lambda window: show_stream(window))
d = subs(src, name='outer subscription')
It is left to the reader to explore the other window functions offered by RxPY, working similar to buffer:
In [22]:
rst(O.window, title="window")
rst(O.window_with_time, title="window_with_time(self, timespan, timeshift=None, scheduler=None)")
rst(O.window_with_time_or_count, title="window_with_time_or_count(self, timespan, count, scheduler=None)")
The GroupBy operator divides an Observable that emits items into an Observable that emits Observables, each one of which emits some subset of the items from the original source Observable. Which items end up on which Observable is typically decided by a discriminating function that evaluates each item and assigns it a key. All items with the same key are emitted by the same Observable.
In [56]:
rst(O.group_by)
keyCode = 'keyCode'
codes = [
{ keyCode: 38}, #// up
{ keyCode: 38}, #// up
{ keyCode: 40}, #// down
{ keyCode: 40}, #// down
{ keyCode: 37}, #// left
{ keyCode: 39}, #// right
{ keyCode: 37}, #// left
{ keyCode: 39}, #// right
{ keyCode: 66}, #// b
{ keyCode: 65} #// a
]
src = O.from_(codes).group_by(
key_mapper = lambda x: x[keyCode], # id of (potentially new) streams
element_mapper = lambda x: x[keyCode] # membership to which stream
)
# we have now 6 streams
src.count().subscribe(lambda total: print ('Total streams:', total))
d = src.subscribe(lambda obs: obs.count().subscribe(lambda x: print ('Count', x)))
In [57]:
rst(O.group_by_until, title='group by (with time intervals)')
src = marble_stream('-(38)-(38)-(40)-(40)-(37)-(39)-(37)-(39)-(66)-(65)-|')
def count(interval):
grouped = src.group_by_until(
key_mapper = lambda x: x, # id of (potentially new) streams
element_mapper = lambda x: x, # membership to which stream
duration_mapper= lambda x: O.timer(interval))
d = grouped.count().subscribe(lambda total: print (
'Distinct elements within %sms: %s' % (interval, total)))
header('grouping interval short')
# now every event is unique, any older stream is forgotten when it occurs:
count(20)
sleep(2)
header('grouping interval medium')
# just enough to detect the directly following doublicates:
count(200)
sleep(2)
header('grouping interval long')
count(1000)
In [50]:
rst(O.first)
d = subs(O.from_((1, 2, 3, 4)).first(lambda x, i: x < 3))
In [48]:
rst(O.single)
# you can also match on the index i:
d = subs(O.from_((1, 2, 3, 4)).single(lambda x, i: (x, i) == (3, 2)))
In [47]:
rst(O.last)
d = subs(O.from_((1, 2, 3, 4)).last(lambda x: x < 3))
In [ ]: