In [4]:
%run startup.py

In [5]:
%%javascript
$.getScript('./assets/js/ipython_notebook_toc.js')


A Decision Tree of Observable Operators

Part 4: Grouping, Buffering, Delaying, misc

source: http://reactivex.io/documentation/operators.html#tree.
(transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, axiros)

This tree can help you find the ReactiveX Observable operator you’re looking for.
See Part 1 for Usage and Output Instructions.

We also require acquaintance with the marble diagrams feature of RxPy.

Table of Contents

I want to shift the items emitted by an Observable forward in time before reemitting them

... delay


In [6]:
reset_start_time(O.delay)
d = subs(marble_stream('a-b-c|').delay(150).merge(marble_stream('1-2-3|')))



========== delay ==========

module rx.linq.observable.delay
@extensionmethod(ObservableBase)
def delay(self, duetime, scheduler=None):
    Time shifts the observable sequence by duetime. The relative time
    intervals between the values are preserved.

    1 - res = rx.Observable.delay(datetime())
    2 - res = rx.Observable.delay(datetime(), Scheduler.timeout)

    3 - res = rx.Observable.delay(5000)
    4 - res = rx.Observable.delay(5000, Scheduler.timeout)

    Keyword arguments:
    :param datetime|int duetime: Absolute (specified as a datetime object) or
        relative time (specified as an integer denoting milliseconds) by which
        to shift the observable sequence.
    :param Scheduler scheduler: [Optional] Scheduler to run the delay timers on.
        If not specified, the timeout scheduler is used.

    :returns: Time-shifted sequence.
    :rtype: Observable
--------------------------------------------------------------------------------

   3.0     M New subscription on stream 276458585
  14.7    T4 [next]   11.7: 1
 125.8    T5 [next]  122.8: 2
 169.0   T20 [next]  165.9: a
 235.4    T7 [next]  232.4: 3
 279.2   T21 [next]  276.2: b
 392.2   T22 [next]  389.2: c
 402.1   T23 [cmpl]  399.1: fin

I want to transform items and notifications from an Observable into items and reemit them

... by wrapping them in Notification objects materialize


In [15]:
rst(O.materialize)
def pretty(notif):
    # this are the interesting attributes:
    return 'kind: %(kind)s, value: %(value)s' % ItemGetter(notif)
d = subs(O.from_((1, 2, 3)).materialize().map(lambda x: pretty(x)))


function materialize of module rx.linq.observable.materialize:
Materializes the implicit notifications of an observable sequence as
    explicit notification values.

    Returns an observable sequence containing the materialized notification
    values from the source sequence.
    
--------------------------------------------------------------------------------

   0.8     M New subscription on stream 277716117
   1.4     M [next]    0.5: kind: N, value: 1
   1.8     M [next]    0.9: kind: N, value: 2
   2.3     M [next]    1.3: kind: N, value: 3
   2.5     M [next]    1.5: kind: C, value: None
   2.5     M [cmpl]    1.5: fin

... which I can then unwrap again with dematerialize


In [22]:
rst(O.dematerialize)
d = subs(O.from_((1, 2, 3)).materialize().dematerialize())

header('Dematerializing manually created notifs')
d = subs(O.from_((rx.core.notification.OnNext('foo'), rx.core.notification.OnCompleted())).dematerialize())


function dematerialize of module rx.linq.observable.dematerialize:
Dematerializes the explicit notification values of an observable
    sequence as implicit notifications.

    Returns an observable sequence exhibiting the behavior corresponding to
    the source sequence's notification values.
    
--------------------------------------------------------------------------------

   0.8     M New subscription on stream 276437645
   1.4     M [next]    0.5: 1
   1.8     M [next]    0.8: 2
   2.3     M [next]    1.4: 3
   2.6     M [cmpl]    1.7: fin


========== Dematerializing manually created notifs ==========


   3.3     M New subscription on stream 276437669
   3.6     M [next]    0.3: foo
   3.9     M [cmpl]    0.5: fin

In [28]:
# Materializing a sequence can be very handy for performing analysis or logging of a sequence.
# You can unwrap a materialized sequence by applying the Dematerialize extension method. 
from rx.testing import dump
d = subs(O.range(1, 3).materialize().dump(name='mydump'))


898987.2     M New subscription on stream 276437705
{mydump}-->{OnNext(1)}
898987.9     M [next]    0.7: OnNext(1)
{mydump}-->{OnNext(2)}
898988.5     M [next]    1.3: OnNext(2)
{mydump}-->{OnNext(3)}
898989.0     M [next]    1.7: OnNext(3)
{mydump}-->{OnCompleted()}
898989.5     M [next]    2.2: OnCompleted()
{mydump} completed
898989.8     M [cmpl]    2.5: fin

I want to ignore all items emitted by an Observable and only pass along its completed/error notification


In [18]:
rst(O.ignore_elements)
d = subs(O.range(0, 10).ignore_elements())


function ignore_elements of module rx.linq.observable.ignoreelements:
Ignores all elements in an observable sequence leaving only the
    termination messages.

    Returns an empty observable {Observable} sequence that signals
    termination, successful or exceptional, of the source sequence.
    
--------------------------------------------------------------------------------

   0.5     M New subscription on stream 277710309
   2.2     M [cmpl]    1.6: fin

I want to mirror an Observable but prefix items to its sequence start_with


In [32]:
rst(O.start_with)
d = subs(O.from_(('a', 'b')).start_with(1, 2, 3))


function start_with of module rx.linq.observable.startswith:
Prepends a sequence of values to an observable sequence with an
    optional scheduler and an argument list of values to prepend.

    1 - source.start_with(1, 2, 3)
    2 - source.start_with(Scheduler.timeout, 1, 2, 3)

    Returns the source sequence prepended with the specified values.
    
--------------------------------------------------------------------------------

   1.7     M New subscription on stream 276437529
   2.3     M [next]    0.5: 1
   2.5     M [next]    0.7: 2
   2.8     M [next]    0.9: 3
   3.4     M [next]    1.5: a
   3.6     M [next]    1.8: b
   3.9     M [cmpl]    2.0: fin

... only if its sequence is empty default_if_empty


In [36]:
rst(O.default_if_empty)
# the default here is to emit a None:
d = subs(O.empty().default_if_empty())
d = subs(O.empty().default_if_empty('hello world'))


function default_if_empty of module rx.linq.observable.defaultifempty:
Returns the elements of the specified sequence or the specified value
    in a singleton sequence if the sequence is empty.

    res = obs = xs.defaultIfEmpty()
    obs = xs.defaultIfEmpty(False

    Keyword arguments:
    default_value -- The value to return if the sequence is empty. If not
        provided, this defaults to None.

    Returns an observable {Observable} sequence that contains the specified
    default value if the source is empty otherwise, the elements of the
    source itself.
    
--------------------------------------------------------------------------------

   0.8     M New subscription on stream 276444765
   1.4     M [next]    0.4: None
   1.5     M [cmpl]    0.5: fin

   2.5     M New subscription on stream 276444761
   3.0     M [next]    0.2: hello world
   3.3     M [cmpl]    0.5: fin

I want to collect items from an Observable and reemit them as buffers of items buffer

Very good intro is here
Buffer 'closing' means: The buffer is flushed to the subscriber(s), then next buffer is getting filled.

Note: The used scheduler seems not 100% exact timewise on the marble streams. But you get the idea.


In [7]:
rst(O.buffer)

header('with closing mapper')
# the simplest one:
print('''Returns an Observable that emits buffers of items it collects from the source Observable. The resulting Observable emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a new buffer whenever the Observable produced by the specified bufferClosingSelector emits an item.''')
xs = marble_stream('1-2-3-4-5-6-7-8-9|')
# defining when to flush the buffer to the subscribers:
cs = marble_stream('---e--e----------|')
print('\nCalling the closer as is:')
d = subs(xs.buffer(closing_mapper=cs))
sleep(2)
print('\nCalling again and again -> equal buffer sizes flushed')
cs = marble_stream('---e|')
d = subs(xs.buffer(closing_mapper=lambda: cs))


function buffer of module rx.linq.observable.buffer:
Projects each element of an observable sequence into zero or more
    buffers.

    Keyword arguments:
    buffer_openings -- Observable sequence whose elements denote the
        creation of windows.
    closing_mapper -- Or, a function invoked to define the boundaries of
        the produced windows (a window is started when the previous one is
        closed, resulting in non-overlapping windows).
    buffer_closing_mapper -- [optional] A function invoked to define the
        closing of each produced window. If a closing mapper function is
        specified for the first parameter, this parameter is ignored.

    Returns an observable sequence of windows.
    
--------------------------------------------------------------------------------


========== with closing mapper ==========

Returns an Observable that emits buffers of items it collects from the source Observable. The resulting Observable emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a new buffer whenever the Observable produced by the specified bufferClosingSelector emits an item.

Calling the closer as is:

   1.4     M New subscription on stream 276484633
 323.4  T319 [next]  322.1: ['1', '2', '3']
 533.3  T321 [next]  531.9: ['4', '5']
 910.1  T317 [next]  908.8: ['6', '7', '8', '9']
 911.0  T317 [cmpl]  909.6: fin

Calling again and again -> equal buffer sizes flushed

2010.0     M New subscription on stream 276473777
2334.4  T346 [next]  324.4: ['1', '2', '3']
2649.7  T350 [next]  639.7: ['4', '5', '6']
2921.9  T343 [next]  911.9: ['7', '8', '9']
2922.6  T343 [cmpl]  912.6: fin

In [37]:
rst(title='with buffer closing mapper')

xs = marble_stream('1-2-3-4-5-6-7-8-9|')
# the more '-' the bigger the emitted buffers.
# Called again and again:
cs  = marble_stream('------e|')
cs2 = marble_stream('--e|')
print ('Subscribing two times with different buffer sizes')
d = subs(xs.buffer(buffer_closing_mapper=lambda: cs), name='BIIIIIG bufs')
d = subs(xs.buffer(buffer_closing_mapper=lambda: cs2),name='small bufs')



========== with buffer closing mapper ==========

Subscribing two times with different buffer sizes

   1.3     M New subscription on stream 277176113

  12.6     M New subscription on stream 281768273
 234.6 T1826 [next]  221.8: ['1', '2'] (small bufs)
 452.4 T1830 [next]  439.6: ['3', '4'] (small bufs)
 624.7 T1802 [next]  623.3: ['1', '2', '3', '4', '5', '6'] (BIIIIIG bufs)
 668.6 T1834 [next]  655.8: ['5', '6'] (small bufs)
 883.7 T1842 [next]  871.0: ['7', '8'] (small bufs)
 911.4 T1800 [next]  910.1: ['7', '8', '9'] (BIIIIIG bufs)
 912.1 T1800 [cmpl]  910.7: fin (BIIIIIG bufs)
 920.1 T1823 [next]  907.3: ['9'] (small bufs)
 920.7 T1823 [cmpl]  907.9: fin (small bufs)

In [36]:
rst(title='with buffer opening mapper')

xs =     marble_stream('1-2-3-4-5-6-7-8-9|')
opens  = marble_stream('---o|')
d = subs(xs.buffer(buffer_openings=lambda: opens))



========== with buffer opening mapper ==========


   0.8     M New subscription on stream 276484729
 323.2 T1769 [next]  322.3: ['1', '2', '3']
 642.0 T1774 [next]  641.2: ['4', '5', '6']
 911.3 T1768 [next]  910.4: ['7', '8', '9']
 915.0 T1768 [cmpl]  914.2: fin

In [35]:
rst(title='with buffer opening and closing mapper')
#TODO: behaviour not really understood. Bug?
xs =     marble_stream('1-2-3-4-5-6-7-8-9-1-2-3-4-5-6-7-8-9-1-2-3-4-5-6-7-8-9|')
opens  = marble_stream('oo---------------------------------------------------|')
closes = marble_stream('-------------------------c|')
d = subs(xs.buffer(buffer_openings=opens, buffer_closing_mapper=lambda: closes))



========== with buffer opening and closing mapper ==========


   1.0     M New subscription on stream 276476373
2530.7 T1741 [next] 2529.6: ['1', '2', '3', '4', '5', '6', '7', '8', '9', '1', '2', '3', '4', '5', '6', '7', '8', '9', '1', '2', '3', '4', '5']
2900.7 T1739 [next] 2899.7: ['6', '7', '8', '9']
2901.9 T1739 [cmpl] 2900.8: fin

... buffering by counts buffer_with_count


In [41]:
rst(O.buffer_with_count)
xs = marble_stream('1-2-3-4-5-6-7-8-9-1-2-3-4-5-6-7-8-9|')
d = subs(xs.buffer_with_count(2, skip=5))


function buffer_with_count of module rx.linq.observable.buffer:
Projects each element of an observable sequence into zero or more
    buffers which are produced based on element count information.

    Example:
    res = xs.buffer_with_count(10)
    res = xs.buffer_with_count(10, 1)

    Keyword parameters:
    count -- {Number} Length of each buffer.
    skip -- {Number} [Optional] Number of elements to skip between creation
        of consecutive buffers. If not provided, defaults to the count.

    Returns an observable {Observable} sequence of buffers.
    
--------------------------------------------------------------------------------

   0.8     M New subscription on stream 276532201
 126.4 T1963 [next]  125.4: ['1', '2']
 676.4 T1973 [next]  675.4: ['6', '7']
1228.0 T1983 [next] 1226.9: ['2', '3']
1782.0 T1993 [next] 1781.0: ['7', '8']
1902.2 T1997 [cmpl] 1901.2: fin

... and take only the last (by count) take_last_buffer


In [44]:
rst(O.take_last_buffer)
xs = marble_stream('1-2-3-4-5|')
d = subs(xs.take_last_buffer(2))


function take_last_buffer of module rx.linq.observable.takelastbuffer:
Returns an array with the specified number of contiguous elements
    from the end of an observable sequence.

    Example:
    res = source.take_last(5)

    Description:
    This operator accumulates a buffer with a length enough to store
    elements count elements. Upon completion of the source sequence, this
    buffer is drained on the result sequence. This causes the elements to be
    delayed.

    Keyword arguments:
    :param int count: Number of elements to take from the end of the source
        sequence.

    :returns: An observable sequence containing a single list with the specified 
    number of elements from the end of the source sequence.
    :rtype: Observable
    
--------------------------------------------------------------------------------

   1.2     M New subscription on stream 276495909
 468.7 T2029 [next]  467.4: ['4', '5']
 469.2 T2029 [cmpl]  468.0: fin

... and take only the first (by time) take_with_time


In [46]:
rst(O.take_with_time)
xs = marble_stream('1-2-3-4-5|')
d = subs(xs.take_with_time(310))


function take_with_time of module rx.linq.observable.takewithtime:
Takes elements for the specified duration from the start of the
    observable source sequence, using the specified scheduler to run timers.

    Example:
    res = source.take_with_time(5000,  [optional scheduler])

    Description:
    This operator accumulates a queue with a length enough to store elements
    received during the initial duration window. As more elements are
    received, elements older than the specified duration are taken from the
    queue and produced on the result sequence. This causes elements to be
    delayed with duration.

    Keyword arguments:
    duration -- {Number} Duration for taking elements from the start of the
        sequence.
    scheduler -- {Scheduler} Scheduler to run the timer on. If not
        specified, defaults to rx.Scheduler.timeout.

    Returns {Observable} An observable sequence with the elements taken
    during the specified duration from the start of the source sequence.
    
--------------------------------------------------------------------------------

   1.2     M New subscription on stream 281756025
  14.0 T2046 [next]   12.6: 1
 125.7 T2048 [next]  124.3: 2
 234.1 T2049 [next]  232.7: 3
 317.1 T2045 [cmpl]  315.7: fin

... or only the last (by time) take_last_with_time


In [47]:
rst(O.take_last_with_time)
xs = marble_stream('1-2-3-4-5|')
d = subs(xs.take_last_with_time(310))


function take_last_with_time of module rx.linq.observable.takelastwithtime:
Returns elements within the specified duration from the end of the
    observable source sequence, using the specified schedulers to run timers
    and to drain the collected elements.

    Example:
    res = source.take_last_with_time(5000, scheduler)

    Description:
    This operator accumulates a queue with a length enough to store elements
    received during the initial duration window. As more elements are
    received, elements older than the specified duration are taken from the
    queue and produced on the result sequence. This causes elements to be
    delayed with duration.

    Keyword arguments:
    duration -- {Number} Duration for taking elements from the end of the
        sequence.
    scheduler -- {Scheduler} [Optional] Scheduler to run the timer on. If
        not specified, defaults to rx.Scheduler.timeout.

    Returns {Observable} An observable sequence with the elements taken
    during the specified duration from the end of the source sequence.
    
--------------------------------------------------------------------------------

   0.8     M New subscription on stream 277184037
 468.7 T2067 [next]  467.7: 3
 469.0 T2067 [next]  468.0: 4
 469.3 T2067 [next]  468.3: 5
 469.4 T2067 [cmpl]  468.4: fin

I want to split one Observable into multiple Observables window

Window is similar to Buffer, but rather than emitting packets of items from the source Observable, it emits Observables, each one of which emits a subset of items from the source Observable and then terminates with an onCompleted notification.

Like Buffer, Window has many varieties, each with its own way of subdividing the original Observable into the resulting Observable emissions, each one of which contains a “window” onto the original emitted items. In the terminology of the Window operator, when a window “opens,” this means that a new Observable is emitted and that Observable will begin emitting items emitted by the source Observable. When a window “closes,” this means that the emitted Observable stops emitting items from the source Observable and terminates with an onCompleted notification to its observers.

from: http://www.introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html#Window

A major difference we see here is that the Window operators can notify you of values from the source as soon as they are produced. The Buffer operators, on the other hand, must wait until the window closes before the values can be notified as an entire list.


In [55]:
rst(O.window_with_count, title="window with count")
wid = 0 # window id
def show_stream(window):
    global wid
    wid += 1
    log('starting new window', wid)
    # yes we can subscribe normally, its not buffers but observables:
    subs(window, name='window id %s' % wid)
    
src = O.interval(100).take(10).window_with_count(3).map(lambda window: show_stream(window))

d = subs(src, name='outer subscription')



========== window with count ==========

function window_with_count of module rx.linq.observable.windowwithcount:
Projects each element of an observable sequence into zero or more
    windows which are produced based on element count information.

    1 - xs.window_with_count(10)
    2 - xs.window_with_count(10, 1)

    count -- Length of each window.
    skip -- [Optional] Number of elements to skip between creation of
        consecutive windows. If not specified, defaults to the count.

    Returns an observable sequence of windows.
    
--------------------------------------------------------------------------------

   1.1     M New subscription on stream 277141053
   2.0     M starting new window 1

   2.4     M New subscription on stream 276132537
   3.0     M [next]    1.6: None (outer subscription)
 106.7 T1844 [next]  104.0: 0 (window id 1)
 212.4 T1845 [next]  209.7: 1 (window id 1)
 313.7 T1846 [next]  310.9: 2 (window id 1)
 314.0 T1846 [cmpl]  311.3: fin (window id 1)
 314.5 T1846 starting new window 2

 314.7 T1846 New subscription on stream 276613473
 315.3 T1846 [next]  314.0: None (outer subscription)
 419.1 T1847 [next]  104.4: 3 (window id 2)
 521.0 T1848 [next]  206.3: 4 (window id 2)
 625.9 T1849 [next]  311.2: 5 (window id 2)
 626.3 T1849 [cmpl]  311.6: fin (window id 2)
 626.6 T1849 starting new window 3

 627.0 T1849 New subscription on stream 276609545
 627.4 T1849 [next]  626.0: None (outer subscription)
 730.2 T1850 [next]  103.2: 6 (window id 3)
 835.8 T1851 [next]  208.8: 7 (window id 3)
 937.1 T1852 [next]  310.1: 8 (window id 3)
 937.5 T1852 [cmpl]  310.4: fin (window id 3)
 937.7 T1852 starting new window 4

 937.9 T1852 New subscription on stream 276550141
 938.2 T1852 [next]  936.8: None (outer subscription)
1039.1 T1853 [next]  101.3: 9 (window id 4)
1039.5 T1853 [cmpl]  101.6: fin (window id 4)
1040.3 T1853 [cmpl] 1038.9: fin (outer subscription)

It is left to the reader to explore the other window functions offered by RxPY, working similar to buffer:


In [22]:
rst(O.window, title="window")
rst(O.window_with_time, title="window_with_time(self, timespan, timeshift=None, scheduler=None)")
rst(O.window_with_time_or_count, title="window_with_time_or_count(self, timespan, count, scheduler=None)")



========== window ==========

function window of module rx.linq.observable.window:
Projects each element of an observable sequence into zero or more
    windows.

    Keyword arguments:
    :param Observable window_openings: Observable sequence whose elements
        denote the creation of windows.
    :param types.FunctionType window_closing_mapper: [Optional] A function
        invoked to define the closing of each produced window. It defines the
        boundaries of the produced windows (a window is started when the
        previous one is closed, resulting in non-overlapping windows).

    :returns: An observable sequence of windows.
    :rtype: Observable[Observable]
    
--------------------------------------------------------------------------------


========== window_with_time(self, timespan, timeshift=None, scheduler=None) ==========

function window_with_time of module rx.linq.observable.windowwithtime:
n.a.
--------------------------------------------------------------------------------


========== window_with_time_or_count(self, timespan, count, scheduler=None) ==========

function window_with_time_or_count of module rx.linq.observable.windowwithtimeorcount:
n.a.
--------------------------------------------------------------------------------

...so that similar items end up on the same Observable group_by

The GroupBy operator divides an Observable that emits items into an Observable that emits Observables, each one of which emits some subset of the items from the original source Observable. Which items end up on which Observable is typically decided by a discriminating function that evaluates each item and assigns it a key. All items with the same key are emitted by the same Observable.


In [56]:
rst(O.group_by)
keyCode = 'keyCode'
codes = [
    { keyCode: 38}, #// up
    { keyCode: 38}, #// up
    { keyCode: 40}, #// down
    { keyCode: 40}, #// down
    { keyCode: 37}, #// left
    { keyCode: 39}, #// right
    { keyCode: 37}, #// left
    { keyCode: 39}, #// right
    { keyCode: 66}, #// b
    { keyCode: 65}  #// a
]

src = O.from_(codes).group_by(
    key_mapper     = lambda x: x[keyCode], # id of (potentially new) streams
    element_mapper = lambda x: x[keyCode]  # membership to which stream
)
# we have now 6 streams
src.count().subscribe(lambda total: print ('Total streams:', total))
d = src.subscribe(lambda obs: obs.count().subscribe(lambda x: print ('Count', x)))


function group_by of module rx.linq.observable.groupby:
Groups the elements of an observable sequence according to a
    specified key mapper function and comparer and selects the resulting
    elements by using a specified function.

    1 - observable.group_by(lambda x: x.id)
    2 - observable.group_by(lambda x: x.id, lambda x: x.name)
    3 - observable.group_by(
        lambda x: x.id,
        lambda x: x.name,
        lambda x: str(x))

    Keyword arguments:
    key_mapper -- A function to extract the key for each element.
    element_mapper -- [Optional] A function to map each source element to
        an element in an observable group.
    comparer -- {Function} [Optional] Used to determine whether the objects
        are equal.

    Returns a sequence of observable groups, each of which corresponds to a
    unique key value, containing all elements that share that same key
    value.
    
--------------------------------------------------------------------------------
Total streams: 6
Count 2
Count 2
Count 2
Count 2
Count 1
Count 1

In [57]:
rst(O.group_by_until, title='group by (with time intervals)')

src = marble_stream('-(38)-(38)-(40)-(40)-(37)-(39)-(37)-(39)-(66)-(65)-|')

def count(interval):
    grouped = src.group_by_until(
                key_mapper     = lambda x: x,   # id of (potentially new) streams
                element_mapper = lambda x: x,  # membership to which stream
                duration_mapper= lambda x: O.timer(interval))

    d = grouped.count().subscribe(lambda total: print (
            'Distinct elements within %sms: %s' % (interval, total)))


header('grouping interval short')
# now every event is unique, any older stream is forgotten when it occurs:
count(20)
sleep(2)
header('grouping interval medium')
# just enough to detect the directly following doublicates:
count(200)
sleep(2)
header('grouping interval long')
count(1000)



========== group by (with time intervals) ==========

function group_by_until of module rx.linq.observable.groupbyuntil:
Groups the elements of an observable sequence according to a
    specified key mapper function. A duration mapper function is used
    to control the lifetime of groups. When a group expires, it receives
    an OnCompleted notification. When a new element with the same key value
    as a reclaimed group occurs, the group will be reborn with a new
    lifetime request.

    1 - observable.group_by_until(
            lambda x: x.id,
            None,
            lambda : Rx.rx.never()
        )
    2 - observable.group_by_until(
            lambda x: x.id,
            lambda x: x.name,
            lambda: Rx.rx.never()
        )
    3 - observable.group_by_until(
            lambda x: x.id,
            lambda x: x.name,
            lambda:  Rx.rx.never(),
            lambda x: str(x))

    Keyword arguments:
    key_mapper -- A function to extract the key for each element.
    duration_mapper -- A function to signal the expiration of a group.
    comparer -- [Optional] {Function} Used to compare objects. When not
        specified, the default comparer is used. Note: this argument will be
        ignored in the Python implementation of Rx. Python objects knows,
        or should know how to compare themselves.

    Returns a sequence of observable groups, each of which corresponds to
    a unique key value, containing all elements that share that same key
    value. If a group's lifetime expires, a new group with the same key
    value can be created once an element with such a key value is
    encountered.
    
--------------------------------------------------------------------------------


========== grouping interval short ==========

Distinct elements within 20ms: 10


========== grouping interval medium ==========

Distinct elements within 200ms: 8


========== grouping interval long ==========

Distinct elements within 1000ms: 6

I want to retrieve a particular item emitted by an Observable:

... the first item emitted first


In [50]:
rst(O.first)
d = subs(O.from_((1, 2, 3, 4)).first(lambda x, i: x < 3))


function first of module rx.linq.observable.first:
Returns the first element of an observable sequence that satisfies
    the condition in the predicate if present else the first item in the
    sequence.

    Example:
    res = res = source.first()
    res = res = source.first(lambda x: x > 3)

    Keyword arguments:
    predicate -- {Function} [Optional] A predicate function to evaluate for
        elements in the source sequence.

    Returns {Observable} Sequence containing the first element in the
    observable sequence that satisfies the condition in the predicate if
    provided, else the first item in the sequence.
    
--------------------------------------------------------------------------------

   0.6     M New subscription on stream 277141121
   1.4     M [next]    0.6: 1
   1.5     M [cmpl]    0.7: fin

... the sole item it emitted single


In [48]:
rst(O.single)
# you can also match on the index i:
d = subs(O.from_((1, 2, 3, 4)).single(lambda x, i: (x, i) == (3, 2)))


function single of module rx.linq.observable.single:
Returns the only element of an observable sequence that satisfies the
    condition in the optional predicate, and reports an exception if there
    is not exactly one element in the observable sequence.

    Example:
    res = source.single()
    res = source.single(lambda x: x == 42)

    Keyword arguments:
    predicate -- {Function} [Optional] A predicate function to evaluate for
        elements in the source sequence.

    Returns {Observable} Sequence containing the single element in the
    observable sequence that satisfies the condition in the predicate.
    
--------------------------------------------------------------------------------

   0.7     M New subscription on stream 277177261
   1.6     M [next]    0.9: 3
   1.7     M [cmpl]    1.0: fin

... the last item emitted before it completed last


In [47]:
rst(O.last)
d = subs(O.from_((1, 2, 3, 4)).last(lambda x: x < 3))


function last of module rx.linq.observable.last:
Returns the last element of an observable sequence that satisfies the
    condition in the predicate if specified, else the last element.

    Example:
    res = source.last()
    res = source.last(lambda x: x > 3)

    Keyword arguments:
    predicate -- {Function} [Optional] A predicate function to evaluate for
        elements in the source sequence.

    Returns {Observable} Sequence containing the last element in the
    observable sequence that satisfies the condition in the predicate.
    
--------------------------------------------------------------------------------

   0.6     M New subscription on stream 276611193
   2.1     M [next]    1.5: 2
   2.4     M [cmpl]    1.7: fin

In [ ]: