About

  • An API for asynchronous programming with observable streams

  • ReactiveX, or Rx for short, is an API for programming with observable event streams.

  • The Observer pattern done right.

  • ReactiveX is a combination of the best ideas from the

    • Observer pattern,
    • The Iterator pattern, and
    • Functional programming

http://reactivex.io/


In [1]:
from ipywidgets import interact, interactive, fixed
import ipywidgets as widgets

In [2]:
import rx

In [3]:
rx


Out[3]:
<module 'rx' from 'E:\\Miniconda3\\lib\\site-packages\\rx\\__init__.py'>

In [4]:
from rx import Observable, Observer

In [5]:
help(Observer)


Help on class Observer in module rx.core.py3.observer:

class Observer(builtins.object)
 |  Methods defined here:
 |  
 |  as_observer(self)
 |      Hides the identity of an observer.
 |      
 |      Returns an observer that hides the identity of the specified observer.
 |  
 |  checked(self)
 |      Checks access to the observer for grammar violations. This includes
 |      checking for multiple OnError or OnCompleted calls, as well as
 |      reentrancy in any of the observer methods. If a violation is detected,
 |      an Error is thrown from the offending observer method call.
 |      
 |      Returns an observer that checks callbacks invocations against the
 |      observer grammar and, if the checks pass, forwards those to the
 |      specified observer.
 |  
 |  on_completed(self)
 |  
 |  on_error(self, error)
 |  
 |  on_next(self, value)
 |  
 |  to_notifier(self)
 |      Creates a notification callback from an observer.
 |      
 |      Returns the action that forwards its input notification to the
 |      underlying observer.
 |  
 |  ----------------------------------------------------------------------
 |  Class methods defined here:
 |  
 |  from_notifier(handler) from abc.ABCMeta
 |      Creates an observer from a notification callback.
 |      
 |      Keyword arguments:
 |      :param handler: Action that handles a notification.
 |      
 |      :returns: The observer object that invokes the specified handler using a
 |      notification corresponding to each message it receives.
 |      :rtype: Observer
 |  
 |  ----------------------------------------------------------------------
 |  Data descriptors defined here:
 |  
 |  __dict__
 |      dictionary for instance variables (if defined)
 |  
 |  __weakref__
 |      list of weak references to the object (if defined)
 |  
 |  ----------------------------------------------------------------------
 |  Data and other attributes defined here:
 |  
 |  __abstractmethods__ = frozenset({'on_completed', 'on_error', 'on_next'...


In [6]:
dir(Observer)


Out[6]:
['__abstractmethods__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_abc_cache',
 '_abc_negative_cache',
 '_abc_negative_cache_version',
 '_abc_registry',
 'as_observer',
 'checked',
 'from_notifier',
 'on_completed',
 'on_error',
 'on_next',
 'to_notifier']

In [7]:
help(Observable)


Help on class Observable in module rx.core.py3.observable:

class Observable(builtins.object)
 |  Methods defined here:
 |  
 |  __add__(self, other)
 |      Pythonic version of concat
 |      
 |      Example:
 |      zs = xs + ys
 |      Returns self.concat(other)
 |  
 |  __await__(self)
 |      Awaits the given observable
 |      :returns: The last item of the observable sequence.
 |      :rtype: Any
 |      :raises TypeError: If key is not of type int or slice
 |  
 |  __getitem__(self, key)
 |      Slices the given observable using Python slice notation. The
 |       arguments to slice is start, stop and step given within brackets [] and
 |       separated with the ':' character. It is basically a wrapper around the
 |       operators skip(), skip_last(), take(), take_last() and filter().
 |      
 |       This marble diagram helps you remember how slices works with streams.
 |       Positive numbers is relative to the start of the events, while negative
 |       numbers are relative to the end (on_completed) of the stream.
 |      
 |       r---e---a---c---t---i---v---e---|
 |       0   1   2   3   4   5   6   7   8
 |      -8  -7  -6  -5  -4  -3  -2  -1
 |      
 |       Example:
 |       result = source[1:10]
 |       result = source[1:-2]
 |       result = source[1:-1:2]
 |      
 |       Keyword arguments:
 |       :param Observable self: Observable to slice
 |       :param slice key: Slice object
 |      
 |       :returns: A sliced observable sequence.
 |       :rtype: Observable
 |       :raises TypeError: If key is not of type int or slice
 |  
 |  __iadd__(self, other)
 |      Pythonic use of concat
 |      
 |      Example:
 |      xs += ys
 |      
 |      Returns self.concat(self, other)
 |  
 |  __mul__(self, b)
 |      Pythonic version of repeat
 |      
 |      Example:
 |      yx = xs * 5
 |      
 |      Returns self.repeat(b)
 |  
 |  aggregate = reduce(self, accumulator, seed=None)
 |  
 |  all(self, predicate)
 |      Determines whether all elements of an observable sequence satisfy a
 |      condition.
 |      
 |      1 - res = source.all(lambda value: value.length > 3)
 |      
 |      Keyword arguments:
 |      :param bool predicate: A function to test each element for a condition.
 |      
 |      :returns: An observable sequence containing a single element determining
 |      whether all elements in the source sequence pass the test in the
 |      specified predicate.
 |  
 |  and_(self, right)
 |      Creates a pattern that matches when both observable sequences
 |      have an available value.
 |      
 |      :param Observable right: Observable sequence to match with the
 |          current sequence.
 |      :returns: Pattern object that matches when both observable sequences
 |          have an available value.
 |  
 |  as_observable(self)
 |      Hides the identity of an observable sequence.
 |      
 |      :returns: An observable sequence that hides the identity of the source
 |          sequence.
 |      :rtype: Observable
 |  
 |  average(self, key_selector=None)
 |      Computes the average of an observable sequence of values that are in
 |      the sequence or obtained by invoking a transform function on each
 |      element of the input sequence if present.
 |      
 |      Example
 |      res = source.average();
 |      res = source.average(lambda x: x.value)
 |      
 |      :param Observable self: Observable to average.
 |      :param types.FunctionType key_selector: A transform function to apply to
 |          each element.
 |      
 |      :returns: An observable sequence containing a single element with the
 |          average of the sequence of values.
 |      :rtype: Observable
 |  
 |  buffer(self, buffer_openings=None, closing_selector=None, buffer_closing_selector=None)
 |      Projects each element of an observable sequence into zero or more
 |      buffers.
 |      
 |      Keyword arguments:
 |      buffer_openings -- Observable sequence whose elements denote the
 |          creation of windows.
 |      closing_selector -- Or, a function invoked to define the boundaries of
 |          the produced windows (a window is started when the previous one is
 |          closed, resulting in non-overlapping windows).
 |      buffer_closing_selector -- [optional] A function invoked to define the
 |          closing of each produced window. If a closing selector function is
 |          specified for the first parameter, self parameter is ignored.
 |      
 |      Returns an observable sequence of windows.
 |  
 |  buffer_with_count(self, count, skip=None)
 |      Projects each element of an observable sequence into zero or more
 |      buffers which are produced based on element count information.
 |      
 |      Example:
 |      res = xs.buffer_with_count(10)
 |      res = xs.buffer_with_count(10, 1)
 |      
 |      Keyword parameters:
 |      count -- {Number} Length of each buffer.
 |      skip -- {Number} [Optional] Number of elements to skip between creation
 |          of consecutive buffers. If not provided, defaults to the count.
 |      
 |      Returns an observable {Observable} sequence of buffers.
 |  
 |  buffer_with_time(self, timespan, timeshift=None, scheduler=None)
 |      Projects each element of an observable sequence into zero or more
 |      buffers which are produced based on timing information.
 |      
 |      # non-overlapping segments of 1 second
 |      1 - res = xs.buffer_with_time(1000)
 |      # segments of 1 second with time shift 0.5 seconds
 |      2 - res = xs.buffer_with_time(1000, 500)
 |      
 |      Keyword arguments:
 |      timespan -- Length of each buffer (specified as an integer denoting
 |          milliseconds).
 |      timeshift -- [Optional] Interval between creation of consecutive
 |          buffers (specified as an integer denoting milliseconds), or an
 |          optional scheduler parameter. If not specified, the time shift
 |          corresponds to the timespan parameter, resulting in non-overlapping
 |          adjacent buffers.
 |      scheduler -- [Optional] Scheduler to run buffer timers on. If not
 |          specified, the timeout scheduler is used.
 |      
 |      Returns an observable sequence of buffers.
 |  
 |  buffer_with_time_or_count(self, timespan, count, scheduler)
 |      Projects each element of an observable sequence into a buffer that
 |      is completed when either it's full or a given amount of time has
 |      elapsed.
 |      
 |      # 5s or 50 items in an array
 |      1 - res = source.buffer_with_time_or_count(5000, 50)
 |      # 5s or 50 items in an array
 |      2 - res = source.buffer_with_time_or_count(5000, 50, Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      timespan -- Maximum time length of a buffer.
 |      count -- Maximum element count of a buffer.
 |      scheduler -- [Optional] Scheduler to run bufferin timers on. If not
 |          specified, the timeout scheduler is used.
 |      
 |      Returns an observable sequence of buffers.
 |  
 |  concat_all(self)
 |      Concatenates an observable sequence of observable sequences.
 |      
 |      Returns an observable sequence that contains the elements of each
 |      observed inner sequence, in sequential order.
 |  
 |  contains(self, value, comparer=None)
 |      Determines whether an observable sequence contains a specified
 |      element with an optional equality comparer.
 |      
 |      Example
 |      1 - res = source.contains(42)
 |      2 - res = source.contains({ "value": 42 }, lambda x, y: x["value"] == y["value")
 |      
 |      Keyword parameters:
 |      value -- The value to locate in the source sequence.
 |      comparer -- {Function} [Optional] An equality comparer to compare elements.
 |      
 |      Returns an observable {Observable} sequence containing a single element
 |      determining whether the source sequence contains an element that has
 |      the specified value.
 |  
 |  controlled(self, enable_queue=True, scheduler=None)
 |      Attach a controller to the observable sequence
 |      
 |      Attach a controller to the observable sequence with the ability to
 |      queue.
 |      
 |      Example:
 |      source = rx.Observable.interval(100).controlled()
 |      source.request(3) # Reads 3 values
 |      
 |      Keyword arguments:
 |      :param bool enable_queue: truthy value to determine if values should
 |          be queued pending the next request
 |      :param Scheduler scheduler: determines how the requests will be scheduled
 |      :returns: The observable sequence which only propagates values on request.
 |      :rtype: Observable
 |  
 |  count(self, predicate=None)
 |      Returns an observable sequence containing a value that represents
 |      how many elements in the specified observable sequence satisfy a
 |      condition if provided, else the count of items.
 |      
 |      1 - res = source.count()
 |      2 - res = source.count(lambda x: x > 3)
 |      
 |      Keyword arguments:
 |      :param types.FunctionType predicate: A function to test each element for a
 |          condition.
 |      
 |      :returns: An observable sequence containing a single element with a
 |      number that represents how many elements in the input sequence satisfy
 |      the condition in the predicate function if provided, else the count of
 |      items in the sequence.
 |      :rtype: Observable
 |  
 |  debounce(self, duetime, scheduler=None)
 |      Ignores values from an observable sequence which are followed by
 |      another value before duetime.
 |      
 |      Example:
 |      1 - res = source.debounce(5000) # 5 seconds
 |      2 - res = source.debounce(5000, scheduler)
 |      
 |      Keyword arguments:
 |      duetime -- {Number} Duration of the throttle period for each value
 |          (specified as an integer denoting milliseconds).
 |      scheduler -- {Scheduler} [Optional]  Scheduler to run the throttle
 |          timers on. If not specified, the timeout scheduler is used.
 |      
 |      Returns {Observable} The debounced sequence.
 |  
 |  default_if_empty(self, default_value=None)
 |      Returns the elements of the specified sequence or the specified value
 |      in a singleton sequence if the sequence is empty.
 |      
 |      res = obs = xs.defaultIfEmpty()
 |      obs = xs.defaultIfEmpty(False
 |      
 |      Keyword arguments:
 |      default_value -- The value to return if the sequence is empty. If not
 |          provided, this defaults to None.
 |      
 |      Returns an observable {Observable} sequence that contains the specified
 |      default value if the source is empty otherwise, the elements of the
 |      source itself.
 |  
 |  delay(self, duetime, scheduler=None)
 |      Time shifts the observable sequence by duetime. The relative time
 |      intervals between the values are preserved.
 |      
 |      1 - res = rx.Observable.delay(datetime())
 |      2 - res = rx.Observable.delay(datetime(), Scheduler.timeout)
 |      
 |      3 - res = rx.Observable.delay(5000)
 |      4 - res = rx.Observable.delay(5000, Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      :param datetime|int duetime: Absolute (specified as a datetime object) or
 |          relative time (specified as an integer denoting milliseconds) by which
 |          to shift the observable sequence.
 |      :param Scheduler scheduler: [Optional] Scheduler to run the delay timers on.
 |          If not specified, the timeout scheduler is used.
 |      
 |      :returns: Time-shifted sequence.
 |      :rtype: Observable
 |  
 |  delay_subscription(self, duetime, scheduler=None)
 |      Time shifts the observable sequence by delaying the subscription.
 |      
 |      1 - res = source.delay_subscription(5000) # 5s
 |      2 - res = source.delay_subscription(5000, Scheduler.timeout) # 5 seconds
 |      
 |      duetime -- Absolute or relative time to perform the subscription at.
 |      scheduler [Optional] Scheduler to run the subscription delay timer on.
 |          If not specified, the timeout scheduler is used.
 |      
 |      Returns time-shifted sequence.
 |  
 |  delay_with_selector(self, subscription_delay=None, delay_duration_selector=None)
 |      Time shifts the observable sequence based on a subscription delay
 |      and a delay selector function for each element.
 |      
 |      # with selector only
 |      1 - res = source.delay_with_selector(lambda x: Scheduler.timer(5000))
 |      # with delay and selector
 |      2 - res = source.delay_with_selector(Observable.timer(2000),
 |                                           lambda x: Observable.timer(x))
 |      
 |      subscription_delay -- [Optional] Sequence indicating the delay for the
 |          subscription to the source.
 |      delay_duration_selector [Optional] Selector function to retrieve a
 |          sequence indicating the delay for each given element.
 |      
 |      Returns time-shifted sequence.
 |  
 |  dematerialize(self)
 |      Dematerializes the explicit notification values of an observable
 |      sequence as implicit notifications.
 |      
 |      Returns an observable sequence exhibiting the behavior corresponding to
 |      the source sequence's notification values.
 |  
 |  distinct(self, key_selector=None, comparer=None)
 |      Returns an observable sequence that contains only distinct elements
 |      according to the key_selector and the comparer. Usage of this operator
 |      should be considered carefully due to the maintenance of an internal
 |      lookup structure which can grow large.
 |      
 |      Example:
 |      res = obs = xs.distinct()
 |      obs = xs.distinct(lambda x: x.id)
 |      obs = xs.distinct(lambda x: x.id, lambda a,b: a == b)
 |      
 |      Keyword arguments:
 |      key_selector -- {Function} [Optional]  A function to compute the
 |          comparison key for each element.
 |      comparer -- {Function} [Optional]  Used to compare items in the
 |          collection.
 |      
 |      Returns an observable {Observable} sequence only containing the distinct
 |      elements, based on a computed key value, from the source sequence.
 |  
 |  distinct_until_changed(self, key_selector=None, comparer=None)
 |      Returns an observable sequence that contains only distinct
 |      contiguous elements according to the key_selector and the comparer.
 |      
 |      1 - obs = observable.distinct_until_changed();
 |      2 - obs = observable.distinct_until_changed(lambda x: x.id)
 |      3 - obs = observable.distinct_until_changed(lambda x: x.id,
 |                                                  lambda x, y: x == y)
 |      
 |      key_selector -- [Optional] A function to compute the comparison key for
 |          each element. If not provided, it projects the value.
 |      comparer -- [Optional] Equality comparer for computed key values. If
 |          not provided, defaults to an equality comparer function.
 |      
 |      Return An observable sequence only containing the distinct contiguous
 |      elements, based on a computed key value, from the source sequence.
 |  
 |  do_action(self, on_next=None, on_error=None, on_completed=None, observer=None)
 |      Invokes an action for each element in the observable sequence and
 |      invokes an action upon graceful or exceptional termination of the
 |      observable sequence. This method can be used for debugging, logging,
 |      etc. of query behavior by intercepting the message stream to run
 |      arbitrary actions for messages on the pipeline.
 |      
 |      1 - observable.do_action(observer)
 |      2 - observable.do_action(on_next)
 |      3 - observable.do_action(on_next, on_error)
 |      4 - observable.do_action(on_next, on_error, on_completed)
 |      
 |      observer -- [Optional] Observer, or ...
 |      on_next -- [Optional] Action to invoke for each element in the
 |          observable sequence.
 |      on_error -- [Optional] Action to invoke upon exceptional termination
 |          of the observable sequence.
 |      on_completed -- [Optional] Action to invoke upon graceful termination
 |          of the observable sequence.
 |      
 |      Returns the source sequence with the side-effecting behavior applied.
 |  
 |  do_while(self, condition)
 |      Repeats source as long as condition holds emulating a do while loop.
 |      
 |      Keyword arguments:
 |      condition -- {Function} The condition which determines if the source
 |          will be repeated.
 |      
 |      Returns an observable {Observable} sequence which is repeated as long
 |      as the condition holds.
 |  
 |  element_at(self, index)
 |      Returns the element at a specified index in a sequence.
 |      
 |      Example:
 |      res = source.element_at(5)
 |      
 |      Keyword arguments:
 |      :param int index: The zero-based index of the element to retrieve.
 |      
 |      :returns: An observable  sequence that produces the element at the
 |      specified position in the source sequence.
 |      :rtype: Observable
 |  
 |  element_at_or_default(self, index, default_value=None)
 |      Returns the element at a specified index in a sequence or a default
 |      value if the index is out of range.
 |      
 |      Example:
 |      res = source.element_at_or_default(5)
 |      res = source.element_at_or_default(5, 0)
 |      
 |      Keyword arguments:
 |      index -- {Number} The zero-based index of the element to retrieve.
 |      default_value -- [Optional] The default value if the index is outside
 |          the bounds of the source sequence.
 |      
 |      Returns an observable {Observable} sequence that produces the element at
 |          the specified position in the source sequence, or a default value if
 |          the index is outside the bounds of the source sequence.
 |  
 |  every = all(self, predicate)
 |  
 |  exclusive(self)
 |      Performs a exclusive waiting for the first to finish before
 |      subscribing to another observable. Observables that come in between
 |      subscriptions will be dropped on the floor.
 |      
 |      Returns an exclusive observable {Observable} with only the results that
 |      happen when subscribed.
 |  
 |  expand(self, selector, scheduler=None)
 |      Expands an observable sequence by recursively invoking selector.
 |      
 |      selector -- {Function} Selector function to invoke for each produced
 |          element, resulting in another sequence to which the selector will be
 |          invoked recursively again.
 |      scheduler -- {Scheduler} [Optional] Scheduler on which to perform the
 |          expansion. If not provided, this defaults to the current thread
 |          scheduler.
 |      
 |      Returns an observable {Observable} sequence containing all the elements
 |      produced by the recursive expansion.
 |  
 |  filter = where(self, predicate)
 |  
 |  finally_action(self, action)
 |      Invokes a specified action after the source observable sequence
 |      terminates gracefully or exceptionally.
 |      
 |      Example:
 |      res = observable.finally(lambda: print('sequence ended')
 |      
 |      Keyword arguments:
 |      action -- {Function} Action to invoke after the source observable
 |          sequence terminates.
 |      Returns {Observable} Source sequence with the action-invoking
 |      termination behavior applied.
 |  
 |  find(self, predicate)
 |      Searches for an element that matches the conditions defined by the
 |      specified predicate, and returns the first occurrence within the entire
 |      Observable sequence.
 |      
 |      Keyword arguments:
 |      predicate -- {Function} The predicate that defines the conditions of the
 |          element to search for.
 |      
 |      Returns an Observable {Observable} sequence with the first element that
 |      matches the conditions defined by the specified predicate, if found
 |      otherwise, None.
 |  
 |  find_index(self, predicate)
 |      Searches for an element that matches the conditions defined by the
 |      specified predicate, and returns an Observable sequence with the
 |      zero-based index of the first occurrence within the entire Observable
 |      sequence.
 |      
 |      Keyword Arguments:
 |      predicate -- {Function} The predicate that defines the conditions of the
 |          element to search for.
 |      
 |      Returns an observable {Observable} sequence with the zero-based index of
 |      the first occurrence of an element that matches the conditions defined
 |      by match, if found; otherwise, -1.
 |  
 |  first(self, predicate=None)
 |      Returns the first element of an observable sequence that satisfies
 |      the condition in the predicate if present else the first item in the
 |      sequence.
 |      
 |      Example:
 |      res = res = source.first()
 |      res = res = source.first(lambda x: x > 3)
 |      
 |      Keyword arguments:
 |      predicate -- {Function} [Optional] A predicate function to evaluate for
 |          elements in the source sequence.
 |      
 |      Returns {Observable} Sequence containing the first element in the
 |      observable sequence that satisfies the condition in the predicate if
 |      provided, else the first item in the sequence.
 |  
 |  first_or_default(self, predicate=None, default_value=None)
 |      Returns the first element of an observable sequence that satisfies
 |      the condition in the predicate, or a default value if no such element
 |      exists.
 |      
 |      Example:
 |      res = source.first_or_default()
 |      res = source.first_or_default(lambda x: x > 3)
 |      res = source.first_or_default(lambda x: x > 3, 0)
 |      res = source.first_or_default(null, 0)
 |      
 |      Keyword arguments:
 |      predicate -- {Function} [optional] A predicate function to evaluate for
 |          elements in the source sequence.
 |      default_value -- {Any} [Optional] The default value if no such element
 |          exists.  If not specified, defaults to None.
 |      
 |      Returns {Observable} Sequence containing the first element in the
 |      observable sequence that satisfies the condition in the predicate, or a
 |      default value if no such element exists.
 |  
 |  flat_map = select_many(self, selector, result_selector=None)
 |  
 |  flat_map_latest = select_switch(self, selector)
 |  
 |  group_by(self, key_selector, element_selector=None, key_serializer=None)
 |      Groups the elements of an observable sequence according to a
 |      specified key selector function and comparer and selects the resulting
 |      elements by using a specified function.
 |      
 |      1 - observable.group_by(lambda x: x.id)
 |      2 - observable.group_by(lambda x: x.id, lambda x: x.name)
 |      3 - observable.group_by(
 |          lambda x: x.id,
 |          lambda x: x.name,
 |          lambda x: str(x))
 |      
 |      Keyword arguments:
 |      key_selector -- A function to extract the key for each element.
 |      element_selector -- [Optional] A function to map each source element to
 |          an element in an observable group.
 |      comparer -- {Function} [Optional] Used to determine whether the objects
 |          are equal.
 |      
 |      Returns a sequence of observable groups, each of which corresponds to a
 |      unique key value, containing all elements that share that same key
 |      value.
 |  
 |  group_by_until(self, key_selector, element_selector, duration_selector, comparer=None)
 |      Groups the elements of an observable sequence according to a
 |      specified key selector function. A duration selector function is used
 |      to control the lifetime of groups. When a group expires, it receives
 |      an OnCompleted notification. When a new element with the same key value
 |      as a reclaimed group occurs, the group will be reborn with a new
 |      lifetime request.
 |      
 |      1 - observable.group_by_until(
 |              lambda x: x.id,
 |              None,
 |              lambda : Rx.Observable.never()
 |          )
 |      2 - observable.group_by_until(
 |              lambda x: x.id,
 |              lambda x: x.name,
 |              lambda: Rx.Observable.never()
 |          )
 |      3 - observable.group_by_until(
 |              lambda x: x.id,
 |              lambda x: x.name,
 |              lambda:  Rx.Observable.never(),
 |              lambda x: str(x))
 |      
 |      Keyword arguments:
 |      key_selector -- A function to extract the key for each element.
 |      duration_selector -- A function to signal the expiration of a group.
 |      comparer -- [Optional] {Function} Used to compare objects. When not
 |          specified, the default comparer is used. Note: this argument will be
 |          ignored in the Python implementation of Rx. Python objects knows,
 |          or should know how to compare themselves.
 |      
 |      Returns a sequence of observable groups, each of which corresponds to
 |      a unique key value, containing all elements that share that same key
 |      value. If a group's lifetime expires, a new group with the same key
 |      value can be created once an element with such a key value is
 |      encountered.
 |  
 |  group_join(self, right, left_duration_selector, right_duration_selector, result_selector)
 |      Correlates the elements of two sequences based on overlapping
 |      durations, and groups the results.
 |      
 |      Keyword arguments:
 |      right -- The right observable sequence to join elements for.
 |      left_duration_selector -- A function to select the duration (expressed
 |          as an observable sequence) of each element of the left observable
 |          sequence, used to determine overlap.
 |      right_duration_selector -- A function to select the duration (expressed
 |          as an observable sequence) of each element of the right observable
 |          sequence, used to determine overlap.
 |      result_selector -- A function invoked to compute a result element for
 |          any element of the left sequence with overlapping elements from the
 |          right observable sequence. The first parameter passed to the
 |          function is an element of the left sequence. The second parameter
 |          passed to the function is an observable sequence with elements from
 |          the right sequence that overlap with the left sequence's element.
 |      
 |      Returns an observable sequence that contains result elements computed
 |      from source elements that have an overlapping duration.
 |  
 |  ignore_elements(self)
 |      Ignores all elements in an observable sequence leaving only the
 |      termination messages.
 |      
 |      Returns an empty observable {Observable} sequence that signals
 |      termination, successful or exceptional, of the source sequence.
 |  
 |  is_empty(self)
 |      Determines whether an observable sequence is empty.
 |      
 |      Returns an observable {Observable} sequence containing a single element
 |      determining whether the source sequence is empty.
 |  
 |  join(self, right, left_duration_selector, right_duration_selector, result_selector)
 |      Correlates the elements of two sequences based on overlapping
 |      durations.
 |      
 |      Keyword arguments:
 |      right -- The right observable sequence to join elements for.
 |      left_duration_selector -- A function to select the duration (expressed
 |          as an observable sequence) of each element of the left observable
 |          sequence, used to determine overlap.
 |      right_duration_selector -- A function to select the duration (expressed
 |          as an observable sequence) of each element of the right observable
 |          sequence, used to determine overlap.
 |      result_selector -- A function invoked to compute a result element for
 |          any two overlapping elements of the left and right observable
 |          sequences. The parameters passed to the function correspond with
 |          the elements from the left and right source sequences for which
 |          overlap occurs.
 |      
 |      Return an observable sequence that contains result elements computed
 |      from source elements that have an overlapping duration.
 |  
 |  last(self, predicate=None)
 |      Returns the last element of an observable sequence that satisfies the
 |      condition in the predicate if specified, else the last element.
 |      
 |      Example:
 |      res = source.last()
 |      res = source.last(lambda x: x > 3)
 |      
 |      Keyword arguments:
 |      predicate -- {Function} [Optional] A predicate function to evaluate for
 |          elements in the source sequence.
 |      
 |      Returns {Observable} Sequence containing the last element in the
 |      observable sequence that satisfies the condition in the predicate.
 |  
 |  last_or_default(self, predicate=None, default_value=None)
 |      Return last or default element.
 |      
 |      Returns the last element of an observable sequence that satisfies
 |      the condition in the predicate, or a default value if no such
 |      element exists.
 |      
 |      Examples:
 |      res = source.last_or_default()
 |      res = source.last_or_default(lambda x: x > 3)
 |      res = source.last_or_default(lambda x: x > 3, 0)
 |      res = source.last_or_default(None, 0)
 |      
 |      predicate -- {Function} [Optional] A predicate function to evaluate
 |          for elements in the source sequence.
 |      default_value -- [Optional] The default value if no such element
 |          exists. If not specified, defaults to None.
 |      
 |      Returns {Observable} Sequence containing the last element in the
 |      observable sequence that satisfies the condition in the predicate,
 |      or a default value if no such element exists.
 |  
 |  let = let_bind(self, func)
 |  
 |  let_bind(self, func)
 |      Returns an observable sequence that is the result of invoking the
 |      selector on the source sequence, without sharing subscriptions. This
 |      operator allows for a fluent style of writing queries that use the same
 |      sequence multiple times.
 |      
 |      selector -- {Function} Selector function which can use the source
 |          sequence as many times as needed, without sharing subscriptions to
 |          the source sequence.
 |      
 |      Returns an observable {Observable} sequence that contains the elements
 |      of a sequence produced by multicasting the source sequence within a
 |      selector function.
 |  
 |  many_select(self, selector, scheduler=None)
 |      Comonadic bind operator. Internally projects a new observable for each
 |      value, and it pushes each observable into the user-defined selector function
 |      that projects/queries each observable into some result.
 |      
 |      Keyword arguments:
 |      selector -- {Function} A transform function to apply to each element.
 |      scheduler -- {Object} [Optional] Scheduler used to execute the
 |          operation. If not specified, defaults to the ImmediateScheduler.
 |      
 |      Returns {Observable} An observable sequence which results from the
 |      comonadic bind operation.
 |  
 |  map = select(self, selector)
 |  
 |  materialize(self)
 |      Materializes the implicit notifications of an observable sequence as
 |      explicit notification values.
 |      
 |      Returns an observable sequence containing the materialized notification
 |      values from the source sequence.
 |  
 |  max(self, comparer=None)
 |      Returns the maximum value in an observable sequence according to the
 |      specified comparer.
 |      
 |      Example
 |      res = source.max()
 |      res = source.max(lambda x, y:  x.value - y.value)
 |      
 |      Keyword arguments:
 |      comparer -- {Function} [Optional] Comparer used to compare elements.
 |      
 |      Returns {Observable} An observable sequence containing a single element
 |      with the maximum element in the source sequence.
 |  
 |  max_by(self, key_selector, comparer=None)
 |      Returns the elements in an observable sequence with the maximum
 |      key value according to the specified comparer.
 |      
 |      Example
 |      res = source.max_by(lambda x: x.value)
 |      res = source.max_by(lambda x: x.value, lambda x, y: x - y)
 |      
 |      Keyword arguments:
 |      key_selector -- {Function} Key selector function.
 |      comparer -- {Function} [Optional] Comparer used to compare key values.
 |      
 |      Returns an observable {Observable} sequence containing a list of zero
 |      or more elements that have a maximum key value.
 |  
 |  merge_all(self)
 |      Merges an observable sequence of observable sequences into an
 |      observable sequence.
 |      
 |      Returns the observable sequence that merges the elements of the inner
 |      sequences.
 |  
 |  merge_observable = merge_all(self)
 |  
 |  min(self, comparer=None)
 |      Returns the minimum element in an observable sequence according to
 |      the optional comparer else a default greater than less than check.
 |      
 |      Example
 |      res = source.min()
 |      res = source.min(lambda x, y: x.value - y.value)
 |      
 |      comparer -- {Function} [Optional] Comparer used to compare elements.
 |      
 |      Returns an observable sequence {Observable} containing a single element
 |      with the minimum element in the source sequence.
 |  
 |  min_by(self, key_selector, comparer=None)
 |      Returns the elements in an observable sequence with the minimum key
 |      value according to the specified comparer.
 |      
 |      Example
 |      res = source.min_by(lambda x: x.value)
 |      res = source.min_by(lambda x: x.value, lambda x, y: x - y)
 |      
 |      Keyword arguments:
 |      key_selector -- {Function} Key selector function.
 |      comparer -- {Function} [Optional] Comparer used to compare key values.
 |      
 |      Returns an observable {Observable} sequence containing a list of zero
 |      or more elements that have a minimum key value.
 |  
 |  multicast(self, subject=None, subject_selector=None, selector=None)
 |      Multicasts the source sequence notifications through an instantiated
 |      subject into all uses of the sequence within a selector function. Each
 |      subscription to the resulting sequence causes a separate multicast
 |      invocation, exposing the sequence resulting from the selector function's
 |      invocation. For specializations with fixed subject types, see Publish,
 |      PublishLast, and Replay.
 |      
 |      Example:
 |      1 - res = source.multicast(observable)
 |      2 - res = source.multicast(subject_selector=lambda: Subject(),
 |                                 selector=lambda x: x)
 |      
 |      Keyword arguments:
 |      subject_selector -- {Function} Factory function to create an
 |          intermediate subject through which the source sequence's elements
 |          will be multicast to the selector function.
 |      subject -- Subject {Subject} to push source elements into.
 |      selector -- {Function} [Optional] Optional selector function which can
 |          use the multicasted source sequence subject to the policies enforced
 |          by the created subject. Specified only if subject_selector" is a
 |          factory function.
 |      
 |      Returns an observable {Observable} sequence that contains the elements
 |      of a sequence produced by multicasting the source sequence within a
 |      selector function.
 |  
 |  observe_on(self, scheduler)
 |      Wraps the source sequence in order to run its observer callbacks on
 |      the specified scheduler.
 |      
 |      Keyword arguments:
 |      scheduler -- Scheduler to notify observers on.
 |      
 |      Returns the source sequence whose observations happen on the specified
 |      scheduler.
 |      
 |      This only invokes observer callbacks on a scheduler. In case the
 |      subscription and/or unsubscription actions have side-effects
 |      that require to be run on a scheduler, use subscribe_on.
 |  
 |  pairwise(self)
 |      Returns a new observable that triggers on the second and subsequent
 |      triggerings of the input observable. The Nth triggering of the input
 |      observable passes the arguments from the N-1th and Nth triggering as a
 |      pair. The argument passed to the N-1th triggering is held in hidden
 |      internal state until the Nth triggering occurs.
 |      
 |      Returns an observable {Observable} that triggers on successive pairs of
 |      observations from the input observable as an array.
 |  
 |  partition(self, predicate)
 |      Returns two observables which partition the observations of the
 |      source by the given function. The first will trigger observations for
 |      those values for which the predicate returns true. The second will
 |      trigger observations for those values where the predicate returns false.
 |      The predicate is executed once for each subscribed observer. Both also
 |      propagate all error observations arising from the source and each
 |      completes when the source completes.
 |      
 |      Keyword arguments:
 |      predicate -- The function to determine which output Observable will
 |          trigger a particular observation.
 |      
 |      Returns a list of observables. The first triggers when the predicate
 |      returns True, and the second triggers when the predicate returns False.
 |  
 |  pausable(self, pauser)
 |      Pauses the underlying observable sequence based upon the observable
 |      sequence which yields True/False.
 |      
 |      Example:
 |      pauser = rx.Subject()
 |      source = rx.Observable.interval(100).pausable(pauser)
 |      
 |      Keyword parameters:
 |      pauser -- {Observable} The observable sequence used to pause the
 |          underlying sequence.
 |      
 |      Returns the observable {Observable} sequence which is paused based upon
 |      the pauser.
 |  
 |  pausable_buffered(self, subject)
 |      Pauses the underlying observable sequence based upon the observable
 |      sequence which yields True/False, and yields the values that were
 |      buffered while paused.
 |      
 |      Example:
 |      pauser = rx.Subject()
 |      source = rx.Observable.interval(100).pausable_buffered(pauser)
 |      
 |      Keyword arguments:
 |      pauser -- {Observable} The observable sequence used to pause the
 |          underlying sequence.
 |      
 |      Returns the observable {Observable} sequence which is paused based upon
 |      the pauser.
 |  
 |  pluck(self, key)
 |      Retrieves the value of a specified key using dict-like access (as in
 |      element[key]) from all elements in the Observable sequence.
 |      
 |      Keyword arguments:
 |      key {String} The key to pluck.
 |      
 |      Returns a new Observable {Observable} sequence of key values.
 |      
 |      To pluck an attribute of each element, use pluck_attr.
 |  
 |  pluck_attr(self, property)
 |      Retrieves the value of a specified property (using getattr) from all
 |      elements in the Observable sequence.
 |      
 |      Keyword arguments:
 |      property {String} The property to pluck.
 |      
 |      Returns a new Observable {Observable} sequence of property values.
 |      
 |      To pluck values using dict-like access (as in element[key]) on each
 |      element, use pluck.
 |  
 |  publish(self, selector=None)
 |      Returns an observable sequence that is the result of invoking the
 |      selector on a connectable observable sequence that shares a single
 |      subscription to the underlying sequence. This operator is a
 |      specialization of Multicast using a regular Subject.
 |      
 |      Example:
 |      res = source.publish()
 |      res = source.publish(lambda x: x)
 |      
 |      selector -- {Function} [Optional] Selector function which can use the
 |          multicasted source sequence as many times as needed, without causing
 |          multiple subscriptions to the source sequence. Subscribers to the
 |          given source will receive all notifications of the source from the
 |          time of the subscription on.
 |      
 |      Returns an observable {Observable} sequence that contains the elements
 |      of a sequence produced by multicasting the source sequence within a
 |      selector function.
 |  
 |  publish_value(self, initial_value, selector=None)
 |      Returns an observable sequence that is the result of invoking the
 |      selector on a connectable observable sequence that shares a single
 |      subscription to the underlying sequence and starts with initial_value.
 |      
 |      This operator is a specialization of Multicast using a BehaviorSubject.
 |      
 |      Example:
 |      res = source.publish_value(42)
 |      res = source.publish_value(42, lambda x: x.map(lambda y: y * y))
 |      
 |      Keyword arguments:
 |      initial_value -- {Mixed} Initial value received by observers upon
 |          subscription.
 |      selector -- {Function} [Optional] Optional selector function which can
 |          use the multicasted source sequence as many times as needed, without
 |          causing multiple subscriptions to the source sequence. Subscribers
 |          to the given source will receive immediately receive the initial
 |          value, followed by all notifications of the source from the time of
 |          the subscription on.
 |      
 |      Returns {Observable} An observable sequence that contains the elements
 |      of a sequence produced by multicasting the source sequence within a
 |      selector function.
 |  
 |  reduce(self, accumulator, seed=None)
 |      Applies an accumulator function over an observable sequence,
 |      returning the result of the aggregation as a single element in the
 |      result sequence. The specified seed value is used as the initial
 |      accumulator value.
 |      
 |      For aggregation behavior with incremental intermediate results, see
 |      Observable.scan.
 |      
 |      Example:
 |      1 - res = source.reduce(lambda acc, x: acc + x)
 |      2 - res = source.reduce(lambda acc, x: acc + x, 0)
 |      
 |      Keyword arguments:
 |      :param types.FunctionType accumulator: An accumulator function to be
 |          invoked on each element.
 |      :param T seed: Optional initial accumulator value.
 |      
 |      :returns: An observable sequence containing a single element with the
 |          final accumulator value.
 |      :rtype: Observable
 |  
 |  replay(self, selector, buffer_size=None, window=None, scheduler=None)
 |      Returns an observable sequence that is the result of invoking the
 |      selector on a connectable observable sequence that shares a single
 |      subscription to the underlying sequence replaying notifications subject
 |      to a maximum time length for the replay buffer.
 |      
 |      This operator is a specialization of Multicast using a ReplaySubject.
 |      
 |      Example:
 |      res = source.replay(buffer_size=3)
 |      res = source.replay(buffer_size=3, window=500)
 |      res = source.replay(None, 3, 500, scheduler)
 |      res = source.replay(lambda x: x.take(6).repeat(), 3, 500, scheduler)
 |      
 |      Keyword arguments:
 |      selector -- [Optional] Selector function which can use the multicasted
 |          source sequence as many times as needed, without causing multiple
 |          subscriptions to the source sequence. Subscribers to the given
 |          source will receive all the notifications of the source subject to
 |          the specified replay buffer trimming policy.
 |      buffer_size -- [Optional] Maximum element count of the replay buffer.
 |      window -- [Optional] Maximum time length of the replay buffer.
 |      scheduler -- [Optional] Scheduler where connected observers within the
 |          selector function will be invoked on.
 |      
 |      Returns {Observable} An observable sequence that contains the elements
 |      of a sequence produced by multicasting the source sequence within a
 |      selector function.
 |  
 |  retry(self, retry_count=None)
 |      Repeats the source observable sequence the specified number of times
 |      or until it successfully terminates. If the retry count is not
 |      specified, it retries indefinitely.
 |      
 |      1 - retried = retry.repeat()
 |      2 - retried = retry.repeat(42)
 |      
 |      retry_count -- [Optional] Number of times to retry the sequence. If not
 |      provided, retry the sequence indefinitely.
 |      
 |      Returns an observable sequence producing the elements of the given
 |      sequence repeatedly until it terminates successfully.
 |  
 |  sample(self, interval=None, sampler=None, scheduler=None)
 |      Samples the observable sequence at each interval.
 |      
 |      1 - res = source.sample(sample_observable) # Sampler tick sequence
 |      2 - res = source.sample(5000) # 5 seconds
 |      2 - res = source.sample(5000, rx.scheduler.timeout) # 5 seconds
 |      
 |      Keyword arguments:
 |      source -- Source sequence to sample.
 |      interval -- Interval at which to sample (specified as an integer
 |          denoting milliseconds).
 |      scheduler -- [Optional] Scheduler to run the sampling timer on. If not
 |          specified, the timeout scheduler is used.
 |      
 |      Returns sampled observable sequence.
 |  
 |  scan(self, accumulator, seed=None)
 |      Applies an accumulator function over an observable sequence and
 |      returns each intermediate result. The optional seed value is used as
 |      the initial accumulator value. For aggregation behavior with no
 |      intermediate results, see Observable.aggregate.
 |      
 |      1 - scanned = source.scan(lambda acc, x: acc + x)
 |      2 - scanned = source.scan(lambda acc, x: acc + x, 0)
 |      
 |      Keyword arguments:
 |      accumulator -- An accumulator function to be invoked on each element.
 |      seed -- [Optional] The initial accumulator value.
 |      
 |      Returns an observable sequence containing the accumulated values.
 |  
 |  select(self, selector)
 |      Project each element of an observable sequence into a new form
 |      by incorporating the element's index.
 |      
 |      1 - source.map(lambda value: value * value)
 |      2 - source.map(lambda value, index: value * value + index)
 |      
 |      Keyword arguments:
 |      :param Callable[[Any, Any], Any] selector: A transform function to
 |          apply to each source element; the second parameter of the
 |          function represents the index of the source element.
 |      :rtype: Observable
 |      
 |      Returns an observable sequence whose elements are the result of
 |      invoking the transform function on each element of source.
 |  
 |  select_many(self, selector, result_selector=None)
 |      One of the Following:
 |      Projects each element of an observable sequence to an observable
 |      sequence and merges the resulting observable sequences into one
 |      observable sequence.
 |      
 |      1 - source.select_many(lambda x: Observable.range(0, x))
 |      
 |      Or:
 |      Projects each element of an observable sequence to an observable
 |      sequence, invokes the result selector for the source element and each
 |      of the corresponding inner sequence's elements, and merges the results
 |      into one observable sequence.
 |      
 |      1 - source.select_many(lambda x: Observable.range(0, x), lambda x, y: x + y)
 |      
 |      Or:
 |      Projects each element of the source observable sequence to the other
 |      observable sequence and merges the resulting observable sequences into
 |      one observable sequence.
 |      
 |      1 - source.select_many(Observable.from_([1,2,3]))
 |      
 |      Keyword arguments:
 |      selector -- A transform function to apply to each element or an
 |          observable sequence to project each element from the source
 |          sequence onto.
 |      result_selector -- [Optional] A transform function to apply to each
 |          element of the intermediate sequence.
 |      
 |      Returns an observable sequence whose elements are the result of
 |      invoking the one-to-many transform function collectionSelector on each
 |      element of the input sequence and then mapping each of those sequence
 |      elements and their corresponding source element to a result element.
 |  
 |  select_switch(self, selector)
 |      Projects each element of an observable sequence into a new sequence
 |      of observable sequences by incorporating the element's index and then
 |      transforms an observable sequence of observable sequences into an
 |      observable sequence producing values only from the most recent
 |      observable sequence.
 |      
 |      Keyword arguments:
 |      selector -- {Function} A transform function to apply to each source
 |          element; the second parameter of the function represents the index
 |          of the source element.
 |      
 |      Returns an observable {Observable} sequence whose elements are the
 |      result of invoking the transform function on each element of source
 |      producing an Observable of Observable sequences and that at any point in
 |      time produces the elements of the most recent inner observable sequence
 |      that has been received.
 |  
 |  sequence_equal(self, second, comparer=None)
 |      Determines whether two sequences are equal by comparing the
 |      elements pairwise using a specified equality comparer.
 |      
 |      1 - res = source.sequence_equal([1,2,3])
 |      2 - res = source.sequence_equal([{ "value": 42 }], lambda x, y: x.value == y.value)
 |      3 - res = source.sequence_equal(Observable.return_value(42))
 |      4 - res = source.sequence_equal(Observable.return_value({ "value": 42 }), lambda x, y: x.value == y.value)
 |      
 |      second -- Second observable sequence or array to compare.
 |      comparer -- [Optional] Comparer used to compare elements of both sequences.
 |      
 |      Returns an observable sequence that contains a single element which
 |      indicates whether both sequences are of equal length and their
 |      corresponding elements are equal according to the specified equality
 |      comparer.
 |  
 |  share(self)
 |      Share a single subscription among multple observers.
 |      
 |      Returns a new Observable that multicasts (shares) the original
 |      Observable. As long as there is at least one Subscriber this
 |      Observable will be subscribed and emitting data. When all
 |      subscribers have unsubscribed it will unsubscribe from the source
 |      Observable.
 |      
 |      This is an alias for Observable.publish().ref_count().
 |  
 |  single(self, predicate=None)
 |      Returns the only element of an observable sequence that satisfies the
 |      condition in the optional predicate, and reports an exception if there
 |      is not exactly one element in the observable sequence.
 |      
 |      Example:
 |      res = source.single()
 |      res = source.single(lambda x: x == 42)
 |      
 |      Keyword arguments:
 |      predicate -- {Function} [Optional] A predicate function to evaluate for
 |          elements in the source sequence.
 |      
 |      Returns {Observable} Sequence containing the single element in the
 |      observable sequence that satisfies the condition in the predicate.
 |  
 |  single_or_default(self, predicate, default_value)
 |      Returns the only element of an observable sequence that matches the
 |      predicate, or a default value if no such element exists this method
 |      reports an exception if there is more than one element in the observable
 |      sequence.
 |      
 |      Example:
 |      res = source.single_or_default()
 |      res = source.single_or_default(lambda x: x == 42)
 |      res = source.single_or_default(lambda x: x == 42, 0)
 |      res = source.single_or_default(None, 0)
 |      
 |      Keyword arguments:
 |      predicate -- {Function} A predicate function to evaluate for elements in
 |          the source sequence.
 |      default_value -- [Optional] The default value if the index is outside
 |          the bounds of the source sequence.
 |      
 |      Returns {Observable} Sequence containing the single element in the
 |      observable sequence that satisfies the condition in the predicate, or a
 |      default value if no such element exists.
 |  
 |  skip(self, count)
 |      Bypasses a specified number of elements in an observable sequence
 |      and then returns the remaining elements.
 |      
 |      Keyword arguments:
 |      count -- The number of elements to skip before returning the remaining
 |          elements.
 |      
 |      Returns an observable sequence that contains the elements that occur
 |      after the specified index in the input sequence.
 |  
 |  skip_last(self, count)
 |      Bypasses a specified number of elements at the end of an observable
 |      sequence.
 |      
 |      Description:
 |      This operator accumulates a queue with a length enough to store the
 |      first `count` elements. As more elements are received, elements are
 |      taken from the front of the queue and produced on the result sequence.
 |      This causes elements to be delayed.
 |      
 |      Keyword arguments
 |      count -- Number of elements to bypass at the end of the source sequence.
 |      
 |      Returns an observable {Observable} sequence containing the source
 |      sequence elements except for the bypassed ones at the end.
 |  
 |  skip_last_with_time(self, duration, scheduler)
 |      Skips elements for the specified duration from the end of the
 |      observable source sequence, using the specified scheduler to run timers.
 |      
 |      1 - res = source.skip_last_with_time(5000)
 |      2 - res = source.skip_last_with_time(5000, scheduler)
 |      
 |      Description:
 |      This operator accumulates a queue with a length enough to store elements
 |      received during the initial duration window. As more elements are
 |      received, elements older than the specified duration are taken from the
 |      queue and produced on the result sequence. This causes elements to be
 |      delayed with duration.
 |      
 |      Keyword arguments:
 |      duration -- {Number} Duration for skipping elements from the end of the
 |          sequence.
 |      scheduler -- {Scheduler} [Optional]  Scheduler to run the timer on. If
 |          not specified, defaults to Rx.Scheduler.timeout
 |      Returns an observable {Observable} sequence with the elements skipped
 |      during the specified duration from the end of the source sequence.
 |  
 |  skip_until(self, other)
 |      Returns the values from the source observable sequence only after
 |      the other observable sequence produces a value.
 |      
 |      other -- The observable sequence that triggers propagation of elements
 |          of the source sequence.
 |      
 |      Returns an observable sequence containing the elements of the source
 |      sequence starting from the point the other sequence triggered
 |      propagation.
 |  
 |  skip_until_with_time(self, start_time, scheduler)
 |      Skips elements from the observable source sequence until the
 |      specified start time, using the specified scheduler to run timers.
 |      Errors produced by the source sequence are always forwarded to the
 |      result sequence, even if the error occurs before the start time.
 |      
 |      Examples:
 |      res = source.skip_until_with_time(new Date(), [optional scheduler]);
 |      res = source.skip_until_with_time(5000, [optional scheduler]);
 |      
 |      Keyword arguments:
 |      start_time -- Time to start taking elements from the source sequence. If
 |          this value is less than or equal to Date(), no elements will be
 |          skipped.
 |      scheduler -- Scheduler to run the timer on. If not specified, defaults
 |          to rx.Scheduler.timeout.
 |      
 |      Returns {Observable} An observable sequence with the elements skipped
 |      until the specified start time.
 |  
 |  skip_while(self, predicate)
 |      Bypasses elements in an observable sequence as long as a specified
 |      condition is true and then returns the remaining elements. The
 |      element's index is used in the logic of the predicate function.
 |      
 |      1 - source.skip_while(lambda value: value < 10)
 |      2 - source.skip_while(lambda value, index: value < 10 or index < 10)
 |      
 |      predicate -- A function to test each element for a condition; the
 |          second parameter of the function represents the index of the
 |          source element.
 |      
 |      Returns an observable sequence that contains the elements from the
 |      input sequence starting at the first element in the linear series that
 |      does not pass the test specified by predicate.
 |  
 |  skip_with_time(self, duration, scheduler=None)
 |      Skips elements for the specified duration from the start of the
 |      observable source sequence, using the specified scheduler to run timers.
 |      
 |      Example:
 |      1 - res = source.skip_with_time(5000, [optional scheduler])
 |      
 |      Description:
 |      Specifying a zero value for duration doesn't guarantee no elements will
 |      be dropped from the start of the source sequence. This is a side-effect
 |      of the asynchrony introduced by the scheduler, where the action that
 |      causes callbacks from the source sequence to be forwarded may not
 |      execute immediately, despite the zero due time.
 |      
 |      Errors produced by the source sequence are always forwarded to the
 |      result sequence, even if the error occurs before the duration.
 |      
 |      Keyword arguments:
 |      duration -- {Number} Duration for skipping elements from the start of
 |          the sequence.
 |      scheduler -- {Scheduler} Scheduler to run the timer on. If not
 |          specified, defaults to Rx.Scheduler.timeout.
 |      
 |      Returns n observable {Observable} sequence with the elements skipped
 |      during the specified duration from the start of the source sequence.
 |  
 |  slice = slice_(self, start=None, stop=None, step=1)
 |      Slices the given observable. It is basically a wrapper around the
 |       operators skip(), skip_last(), take(), take_last() and filter().
 |      
 |       This marble diagram helps you remember how slices works with streams.
 |       Positive numbers is relative to the start of the events, while negative
 |       numbers are relative to the end (on_completed) of the stream.
 |      
 |       r---e---a---c---t---i---v---e---|
 |       0   1   2   3   4   5   6   7   8
 |      -8  -7  -6  -5  -4  -3  -2  -1
 |      
 |       Example:
 |       result = source.slice(1, 10)
 |       result = source.slice(1, -2)
 |       result = source.slice(1, -1, 2)
 |      
 |       Keyword arguments:
 |       :param Observable self: Observable to slice
 |       :param int start: Number of elements to skip of take last
 |       :param int stop: Last element to take of skip last
 |       :param int step: Takes every step element. Must be larger than zero
 |      
 |       :returns: Returns a sliced observable sequence.
 |       :rtype: Observable
 |  
 |  some(self, predicate=None)
 |      Determines whether some element of an observable sequence satisfies a
 |      condition if present, else if some items are in the sequence.
 |      
 |      Example:
 |      result = source.some()
 |      result = source.some(lambda x: x > 3)
 |      
 |      Keyword arguments:
 |      predicate -- A function to test each element for a condition.
 |      
 |      Returns {Observable} an observable sequence containing a single element
 |      determining whether some elements in the source sequence pass the test
 |      in the specified predicate if given, else if some items are in the
 |      sequence.
 |  
 |  start_with(self, *args, **kw)
 |      Prepends a sequence of values to an observable sequence with an
 |      optional scheduler and an argument list of values to prepend.
 |      
 |      1 - source.start_with(1, 2, 3)
 |      2 - source.start_with(Scheduler.timeout, 1, 2, 3)
 |      
 |      Returns the source sequence prepended with the specified values.
 |  
 |  subscribe(self, observer)
 |  
 |  subscribe_on(self, scheduler)
 |      Subscribe on the specified scheduler.
 |      
 |      Wrap the source sequence in order to run its subscription and
 |      unsubscription logic on the specified scheduler. This operation is not
 |      commonly used; see the remarks section for more information on the
 |      distinction between subscribe_on and observe_on.
 |      
 |      Keyword arguments:
 |      scheduler -- Scheduler to perform subscription and unsubscription
 |          actions on.
 |      
 |      Returns the source sequence whose subscriptions and unsubscriptions
 |      happen on the specified scheduler.
 |      
 |      This only performs the side-effects of subscription and unsubscription
 |      on the specified scheduler. In order to invoke observer callbacks on a
 |      scheduler, use observe_on.
 |  
 |  sum(self, key_selector=None)
 |      Computes the sum of a sequence of values that are obtained by
 |      invoking an optional transform function on each element of the input
 |      sequence, else if not specified computes the sum on each item in the
 |      sequence.
 |      
 |      Example
 |      res = source.sum()
 |      res = source.sum(lambda x: x.value)
 |      
 |      key_selector -- {Function} [Optional] A transform function to apply to
 |          each element.
 |      
 |      Returns an observable {Observable} sequence containing a single element
 |      with the sum of the values in the source sequence.
 |  
 |  switch_latest(self)
 |      Transforms an observable sequence of observable sequences into an
 |      observable sequence producing values only from the most recent
 |      observable sequence.
 |      
 |      :returns: The observable sequence that at any point in time produces the
 |      elements of the most recent inner observable sequence that has been
 |      received.
 |      :rtype: Observable
 |  
 |  switch_map = select_switch(self, selector)
 |  
 |  take(self, count, scheduler=None)
 |      Returns a specified number of contiguous elements from the start of
 |      an observable sequence, using the specified scheduler for the edge case
 |      of take(0).
 |      
 |      1 - source.take(5)
 |      2 - source.take(0, rx.Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      count -- The number of elements to return.
 |      scheduler -- [Optional] Scheduler used to produce an OnCompleted
 |          message in case count is set to 0.
 |      
 |      Returns an observable sequence that contains the specified number of
 |      elements from the start of the input sequence.
 |  
 |  take_last(self, count)
 |      Returns a specified number of contiguous elements from the end of an
 |      observable sequence.
 |      
 |      Example:
 |      res = source.take_last(5)
 |      
 |      Description:
 |      This operator accumulates a buffer with a length enough to store
 |      elements count elements. Upon completion of the source sequence, this
 |      buffer is drained on the result sequence. This causes the elements to be
 |      delayed.
 |      
 |      Keyword arguments:
 |      :param int count: Number of elements to take from the end of the source
 |          sequence.
 |      
 |      :returns: An observable sequence containing the specified number of elements 
 |          from the end of the source sequence.
 |      :rtype: Observable
 |  
 |  take_last_buffer(self, count)
 |      Returns an array with the specified number of contiguous elements
 |      from the end of an observable sequence.
 |      
 |      Example:
 |      res = source.take_last(5)
 |      
 |      Description:
 |      This operator accumulates a buffer with a length enough to store
 |      elements count elements. Upon completion of the source sequence, this
 |      buffer is drained on the result sequence. This causes the elements to be
 |      delayed.
 |      
 |      Keyword arguments:
 |      :param int count: Number of elements to take from the end of the source
 |          sequence.
 |      
 |      :returns: An observable sequence containing a single list with the specified 
 |      number of elements from the end of the source sequence.
 |      :rtype: Observable
 |  
 |  take_last_with_time(self, duration, scheduler=None)
 |      Returns elements within the specified duration from the end of the
 |      observable source sequence, using the specified schedulers to run timers
 |      and to drain the collected elements.
 |      
 |      Example:
 |      res = source.take_last_with_time(5000, scheduler)
 |      
 |      Description:
 |      This operator accumulates a queue with a length enough to store elements
 |      received during the initial duration window. As more elements are
 |      received, elements older than the specified duration are taken from the
 |      queue and produced on the result sequence. This causes elements to be
 |      delayed with duration.
 |      
 |      Keyword arguments:
 |      duration -- {Number} Duration for taking elements from the end of the
 |          sequence.
 |      scheduler -- {Scheduler} [Optional] Scheduler to run the timer on. If
 |          not specified, defaults to rx.Scheduler.timeout.
 |      
 |      Returns {Observable} An observable sequence with the elements taken
 |      during the specified duration from the end of the source sequence.
 |  
 |  take_until(self, other)
 |      Returns the values from the source observable sequence until the
 |      other observable sequence produces a value.
 |      
 |      Keyword arguments:
 |      other -- Observable sequence that terminates propagation of elements of
 |          the source sequence.
 |      
 |      Returns an observable sequence containing the elements of the source
 |      sequence up to the point the other sequence interrupted further
 |      propagation.
 |  
 |  take_until_with_time(self, end_time, scheduler=None)
 |      Takes elements for the specified duration until the specified end
 |      time, using the specified scheduler to run timers.
 |      
 |      Examples:
 |      1 - res = source.take_until_with_time(dt, [optional scheduler])
 |      2 - res = source.take_until_with_time(5000, [optional scheduler])
 |      
 |      Keyword Arguments:
 |      end_time -- {Number | Date} Time to stop taking elements from the source
 |          sequence. If this value is less than or equal to Date(), the
 |          result stream will complete immediately.
 |      scheduler -- {Scheduler} Scheduler to run the timer on.
 |      
 |      Returns an observable {Observable} sequence with the elements taken
 |      until the specified end time.
 |  
 |  take_while(self, predicate)
 |      Returns elements from an observable sequence as long as a specified
 |      condition is true. The element's index is used in the logic of the
 |      predicate function.
 |      
 |      1 - source.take_while(lambda value: value < 10)
 |      2 - source.take_while(lambda value, index: value < 10 or index < 10)
 |      
 |      Keyword arguments:
 |      predicate -- A function to test each element for a condition; the
 |          second parameter of the function represents the index of the source
 |          element.
 |      
 |      Returns an observable sequence that contains the elements from the
 |      input sequence that occur before the element at which the test no
 |      longer passes.
 |  
 |  take_with_time(self, duration, scheduler=None)
 |      Takes elements for the specified duration from the start of the
 |      observable source sequence, using the specified scheduler to run timers.
 |      
 |      Example:
 |      res = source.take_with_time(5000,  [optional scheduler])
 |      
 |      Description:
 |      This operator accumulates a queue with a length enough to store elements
 |      received during the initial duration window. As more elements are
 |      received, elements older than the specified duration are taken from the
 |      queue and produced on the result sequence. This causes elements to be
 |      delayed with duration.
 |      
 |      Keyword arguments:
 |      duration -- {Number} Duration for taking elements from the start of the
 |          sequence.
 |      scheduler -- {Scheduler} Scheduler to run the timer on. If not
 |          specified, defaults to rx.Scheduler.timeout.
 |      
 |      Returns {Observable} An observable sequence with the elements taken
 |      during the specified duration from the start of the source sequence.
 |  
 |  tap = do_action(self, on_next=None, on_error=None, on_completed=None, observer=None)
 |  
 |  then = then_do(self, selector)
 |  
 |  then_do(self, selector)
 |      Matches when the observable sequence has an available value and projects
 |      the value.
 |      
 |      :param types.FunctionType selector: Selector that will be invoked for values
 |          in the source sequence.
 |      :returns: Plan that produces the projected values, to be fed (with other
 |          plans) to the when operator.
 |      :rtype: Plan
 |  
 |  throttle_first(self, window_duration, scheduler=None)
 |      Returns an Observable that emits only the first item emitted by the
 |      source Observable during sequential time windows of a specified
 |      duration.
 |      
 |      Keyword arguments:
 |      window_duration -- {timedelta} time to wait before emitting another item
 |          after emitting the last item.
 |      scheduler -- {Scheduler} [Optional] the Scheduler to use internally to
 |          manage the timers that handle timeout for each item. If not
 |          provided, defaults to Scheduler.timeout.
 |      Returns {Observable} An Observable that performs the throttle operation.
 |  
 |  throttle_last = sample(self, interval=None, sampler=None, scheduler=None)
 |  
 |  throttle_with_selector(self, throttle_duration_selector)
 |      Ignores values from an observable sequence which are followed by
 |      another value within a computed throttle duration.
 |      
 |      1 - res = source.throttle_with_selector(lambda x: rx.Scheduler.timer(x+x))
 |      
 |      Keyword arguments:
 |      throttle_duration_selector -- Selector function to retrieve a sequence
 |          indicating the throttle duration for each given element.
 |      
 |      Returns the throttled sequence.
 |  
 |  throttle_with_timeout = debounce(self, duetime, scheduler=None)
 |  
 |  time_interval(self, scheduler)
 |      Records the time interval between consecutive values in an
 |      observable sequence.
 |      
 |      1 - res = source.time_interval();
 |      2 - res = source.time_interval(Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      scheduler -- [Optional] Scheduler used to compute time intervals. If
 |          not specified, the timeout scheduler is used.
 |      
 |      Return An observable sequence with time interval information on values.
 |  
 |  timeout(self, duetime, other=None, scheduler=None)
 |      Returns the source observable sequence or the other observable
 |      sequence if duetime elapses.
 |      
 |      1 - res = source.timeout(new Date()); # As a date
 |      2 - res = source.timeout(5000); # 5 seconds
 |      # As a date and timeout observable
 |      3 - res = source.timeout(datetime(), rx.Observable.return_value(42))
 |      # 5 seconds and timeout observable
 |      4 - res = source.timeout(5000, rx.Observable.return_value(42))
 |      # As a date and timeout observable
 |      5 - res = source.timeout(datetime(), rx.Observable.return_value(42),
 |                               rx.Scheduler.timeout)
 |      # 5 seconds and timeout observable
 |      6 - res = source.timeout(5000, rx.Observable.return_value(42),
 |                               rx.Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      :param datetime|int duetime: Absolute (specified as a datetime object) or
 |          relative time (specified as an integer denoting milliseconds) when a
 |          timeout occurs.
 |      :param Observable other: Sequence to return in case of a timeout. If not
 |          specified, a timeout error throwing sequence will be used.
 |      :param Scheduler scheduler: Scheduler to run the timeout timers on. If not
 |          specified, the timeout scheduler is used.
 |      
 |      :returns: The source sequence switching to the other sequence in case of
 |          a timeout.
 |      :rtype: Observable
 |  
 |  timeout_with_selector(self, first_timeout=None, timeout_duration_selector=None, other=None)
 |      Returns the source observable sequence, switching to the other
 |      observable sequence if a timeout is signaled.
 |      
 |      1 - res = source.timeout_with_selector(rx.Observable.timer(500))
 |      2 - res = source.timeout_with_selector(rx.Observable.timer(500),
 |                  lambda x: rx.Observable.timer(200))
 |      3 - res = source.timeout_with_selector(rx.Observable.timer(500),
 |                  lambda x: rx.Observable.timer(200)),
 |                  rx.Observable.return_value(42))
 |      
 |      first_timeout -- [Optional] Observable sequence that represents the
 |          timeout for the first element. If not provided, this defaults to
 |          Observable.never().
 |      timeout_Duration_selector -- [Optional] Selector to retrieve an
 |          observable sequence that represents the timeout between the current
 |          element and the next element.
 |      other -- [Optional] Sequence to return in case of a timeout. If not
 |          provided, this is set to Observable.throw_exception().
 |      
 |      Returns the source sequence switching to the other sequence in case of
 |      a timeout.
 |  
 |  timestamp(self, scheduler=None)
 |      Records the timestamp for each value in an observable sequence.
 |      
 |      1 - res = source.timestamp() # produces { "value": x, "timestamp": ts }
 |      2 - res = source.timestamp(Scheduler.timeout)
 |      
 |      :param Scheduler scheduler: [Optional] Scheduler used to compute timestamps. If not
 |          specified, the timeout scheduler is used.
 |      
 |      Returns an observable sequence with timestamp information on values.
 |  
 |  to_blocking(self)
 |  
 |  to_dict(self, key_selector, element_selector=None)
 |      Converts the observable sequence to a Map if it exists.
 |      
 |      Keyword arguments:
 |      key_selector -- {Function} A function which produces the key for the
 |          Map.
 |      element_selector -- {Function} [Optional] An optional function which
 |          produces the element for the Map. If not present, defaults to the
 |          value from the observable sequence.
 |      Returns {Observable} An observable sequence with a single value of a Map
 |      containing the values from the observable sequence.
 |  
 |  to_future(self, future_ctor=None)
 |      Converts an existing observable sequence to a Future
 |      
 |      Example:
 |      future = rx.Observable.return_value(42).to_future(trollius.Future);
 |      
 |      With config:
 |      rx.config["Future"] = trollius.Future
 |      future = rx.Observable.return_value(42).to_future()
 |      
 |      future_ctor -- {Function} [Optional] The constructor of the future.
 |          If not provided, it looks for it in rx.config.Future.
 |      
 |      Returns {Future} An future with the last value from the observable
 |      sequence.
 |  
 |  to_iterable = to_list(self)
 |  
 |  to_list(self)
 |      Creates a list from an observable sequence.
 |      
 |      Returns an observable sequence containing a single element with a list
 |      containing all the elements of the source sequence.
 |  
 |  to_set(self)
 |      Converts the observable sequence to a set.
 |      
 |      Returns {Observable} An observable sequence with a single value of a set
 |      containing the values from the observable sequence.
 |  
 |  transduce(self, transducer)
 |      Execute a transducer to transform the observable sequence.
 |      
 |      Keyword arguments:
 |      :param Transducer transducer: A transducer to execute.
 |      
 |      :returns: An Observable sequence containing the results from the
 |          transducer.
 |      :rtype: Observable
 |  
 |  where(self, predicate)
 |      Filters the elements of an observable sequence based on a predicate
 |      by incorporating the element's index.
 |      
 |      1 - source.filter(lambda value: value < 10)
 |      2 - source.filter(lambda value, index: value < 10 or index < 10)
 |      
 |      Keyword arguments:
 |      :param Observable self: Observable sequence to filter.
 |      :param (T, <int>) -> bool predicate: A function to test each source element
 |          for a condition; the
 |          second parameter of the function represents the index of the source
 |          element.
 |      
 |      :returns: An observable sequence that contains elements from the input
 |      sequence that satisfy the condition.
 |      :rtype: Observable
 |  
 |  window(self, window_openings=None, window_closing_selector=None)
 |      Projects each element of an observable sequence into zero or more
 |      windows.
 |      
 |      Keyword arguments:
 |      :param Observable window_openings: Observable sequence whose elements
 |          denote the creation of windows.
 |      :param types.FunctionType window_closing_selector: [Optional] A function
 |          invoked to define the closing of each produced window. It defines the
 |          boundaries of the produced windows (a window is started when the
 |          previous one is closed, resulting in non-overlapping windows).
 |      
 |      :returns: An observable sequence of windows.
 |      :rtype: Observable[Observable]
 |  
 |  window_with_count(self, count, skip=None)
 |      Projects each element of an observable sequence into zero or more
 |      windows which are produced based on element count information.
 |      
 |      1 - xs.window_with_count(10)
 |      2 - xs.window_with_count(10, 1)
 |      
 |      count -- Length of each window.
 |      skip -- [Optional] Number of elements to skip between creation of
 |          consecutive windows. If not specified, defaults to the count.
 |      
 |      Returns an observable sequence of windows.
 |  
 |  window_with_time(self, timespan, timeshift=None, scheduler=None)
 |  
 |  window_with_time_or_count(self, timespan, count, scheduler=None)
 |  
 |  ----------------------------------------------------------------------
 |  Class methods defined here:
 |  
 |  amb(*args) from abc.ABCMeta
 |      Propagates the observable sequence that reacts first.
 |      
 |      E.g. winner = rx.Observable.amb(xs, ys, zs)
 |      
 |      Returns an observable sequence that surfaces any of the given sequences,
 |      whichever reacted first.
 |  
 |  case(selector, sources, default_source=None, scheduler=None) from abc.ABCMeta
 |      Uses selector to determine which source in sources to use.
 |      There is an alias 'switch_case'.
 |      
 |      Example:
 |      1 - res = rx.Observable.case(selector, { '1': obs1, '2': obs2 })
 |      2 - res = rx.Observable.case(selector, { '1': obs1, '2': obs2 }, obs0)
 |      3 - res = rx.Observable.case(selector, { '1': obs1, '2': obs2 },
 |                                   scheduler=scheduler)
 |      
 |      Keyword arguments:
 |      :param types.FunctionType selector: The function which extracts the value
 |          for to test in a case statement.
 |      :param list sources: A object which has keys which correspond to the case
 |          statement labels.
 |      :param Observable default_source: The observable sequence or Promise that
 |          will be run if the sources are not matched. If this is not provided, it
 |          defaults to rx.Observabe.empty with the specified scheduler.
 |      
 |      :returns: An observable sequence which is determined by a case statement.
 |      :rtype: Observable
 |  
 |  catch_exception(*args) from abc.ABCMeta
 |      Continues an observable sequence that is terminated by an
 |      exception with the next observable sequence.
 |      
 |      1 - res = Observable.catch_exception(xs, ys, zs)
 |      2 - res = Observable.catch_exception([xs, ys, zs])
 |      
 |      Returns an observable sequence containing elements from consecutive
 |      source sequences until a source sequence terminates successfully.
 |  
 |  combine_latest(*args) from abc.ABCMeta
 |      Merges the specified observable sequences into one observable
 |      sequence by using the selector function whenever any of the
 |      observable sequences produces an element.
 |      
 |      1 - obs = Observable.combine_latest(obs1, obs2, obs3,
 |                                         lambda o1, o2, o3: o1 + o2 + o3)
 |      2 - obs = Observable.combine_latest([obs1, obs2, obs3],
 |                                          lambda o1, o2, o3: o1 + o2 + o3)
 |      
 |      Returns an observable sequence containing the result of combining
 |      elements of the sources using the specified result selector
 |      function.
 |  
 |  concat(*args) from abc.ABCMeta
 |      Concatenates all the observable sequences.
 |      
 |      1 - res = Observable.concat(xs, ys, zs)
 |      2 - res = Observable.concat([xs, ys, zs])
 |      
 |      Returns an observable sequence that contains the elements of each given
 |      sequence, in sequential order.
 |  
 |  create(subscribe) from abc.ABCMeta
 |  
 |  create_with_disposable = create(subscribe) from abc.ABCMeta
 |  
 |  defer(observable_factory) from abc.ABCMeta
 |      Returns an observable sequence that invokes the specified factory
 |      function whenever a new observer subscribes.
 |      
 |      Example:
 |      1 - res = rx.Observable.defer(lambda: rx.Observable.from_([1,2,3]))
 |      
 |      Keyword arguments:
 |      :param types.FunctionType observable_factory: Observable factory function
 |          to invoke for each observer that subscribes to the resulting sequence.
 |      
 |      :returns: An observable sequence whose observers trigger an invocation
 |      of the given observable factory function.
 |      :rtype: Observable
 |  
 |  empty(scheduler=None) from abc.ABCMeta
 |      Returns an empty observable sequence, using the specified scheduler
 |      to send out the single OnCompleted message.
 |      
 |      1 - res = rx.Observable.empty()
 |      2 - res = rx.Observable.empty(rx.Scheduler.timeout)
 |      
 |      scheduler -- Scheduler to send the termination call on.
 |      
 |      Returns an observable sequence with no elements.
 |  
 |  for_in(sources, result_selector) from abc.ABCMeta
 |      Concatenates the observable sequences obtained by running the
 |      specified result selector for each element in source.
 |      
 |      sources -- {Array} An array of values to turn into an observable
 |          sequence.
 |      result_selector -- {Function} A function to apply to each item in the
 |          sources array to turn it into an observable sequence.
 |      Returns an observable {Observable} sequence from the concatenated
 |      observable sequences.
 |  
 |  from_ = from_iterable(iterable, scheduler=None) from abc.ABCMeta
 |      Converts an array to an observable sequence, using an optional
 |      scheduler to enumerate the array.
 |      
 |      1 - res = rx.Observable.from_iterable([1,2,3])
 |      2 - res = rx.Observable.from_iterable([1,2,3], rx.Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      :param Observable cls: Observable class
 |      :param Scheduler scheduler: [Optional] Scheduler to run the
 |          enumeration of the input sequence on.
 |      
 |      :returns: The observable sequence whose elements are pulled from the
 |          given enumerable sequence.
 |      :rtype: Observable
 |  
 |  from_callback(func, selector=None) from abc.ABCMeta
 |      Converts a callback function to an observable sequence.
 |      
 |      Keyword arguments:
 |      func -- {Function} Function with a callback as the last parameter to
 |          convert to an Observable sequence.
 |      selector -- {Function} [Optional] A selector which takes the arguments
 |          from the callback to produce a single item to yield on next.
 |      
 |      Returns {Function} A function, when executed with the required
 |      parameters minus the callback, produces an Observable sequence with a
 |      single value of the arguments to the callback as a list.
 |  
 |  from_future(future) from abc.ABCMeta
 |      Converts a Future to an Observable sequence
 |      
 |      Keyword Arguments:
 |      future -- {Future} A Python 3 compatible future.
 |          https://docs.python.org/3/library/asyncio-task.html#future
 |          http://www.tornadoweb.org/en/stable/concurrent.html#tornado.concurrent.Future
 |      
 |      Returns {Observable} An Observable sequence which wraps the existing
 |      future success and failure.
 |  
 |  from_iterable(iterable, scheduler=None) from abc.ABCMeta
 |      Converts an array to an observable sequence, using an optional
 |      scheduler to enumerate the array.
 |      
 |      1 - res = rx.Observable.from_iterable([1,2,3])
 |      2 - res = rx.Observable.from_iterable([1,2,3], rx.Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      :param Observable cls: Observable class
 |      :param Scheduler scheduler: [Optional] Scheduler to run the
 |          enumeration of the input sequence on.
 |      
 |      :returns: The observable sequence whose elements are pulled from the
 |          given enumerable sequence.
 |      :rtype: Observable
 |  
 |  from_list = from_iterable(iterable, scheduler=None) from abc.ABCMeta
 |      Converts an array to an observable sequence, using an optional
 |      scheduler to enumerate the array.
 |      
 |      1 - res = rx.Observable.from_iterable([1,2,3])
 |      2 - res = rx.Observable.from_iterable([1,2,3], rx.Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      :param Observable cls: Observable class
 |      :param Scheduler scheduler: [Optional] Scheduler to run the
 |          enumeration of the input sequence on.
 |      
 |      :returns: The observable sequence whose elements are pulled from the
 |          given enumerable sequence.
 |      :rtype: Observable
 |  
 |  generate(initial_state, condition, iterate, result_selector, scheduler=None) from abc.ABCMeta
 |      Generates an observable sequence by running a state-driven loop
 |      producing the sequence's elements, using the specified scheduler to
 |      send out observer messages.
 |      
 |      1 - res = rx.Observable.generate(0,
 |          lambda x: x < 10,
 |          lambda x: x + 1,
 |          lambda x: x)
 |      2 - res = rx.Observable.generate(0,
 |          lambda x: x < 10,
 |          lambda x: x + 1,
 |          lambda x: x,
 |          Rx.Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      initial_state -- Initial state.
 |      condition -- Condition to terminate generation (upon returning False).
 |      iterate -- Iteration step function.
 |      result_selector -- Selector function for results produced in the
 |          sequence.
 |      scheduler -- [Optional] Scheduler on which to run the generator loop.
 |          If not provided, defaults to CurrentThreadScheduler.
 |      
 |      Returns the generated sequence.
 |  
 |  generate_with_relative_time(initial_state, condition, iterate, result_selector, time_selector, scheduler=None) from abc.ABCMeta
 |      Generates an observable sequence by iterating a state from an
 |      initial state until the condition fails.
 |      
 |      res = source.generate_with_relative_time(0,
 |          lambda x: True,
 |          lambda x: x + 1,
 |          lambda x: x,
 |          lambda x: 500)
 |      
 |      initial_state -- Initial state.
 |      condition -- Condition to terminate generation (upon returning false).
 |      iterate -- Iteration step function.
 |      result_selector -- Selector function for results produced in the
 |          sequence.
 |      time_selector -- Time selector function to control the speed of values
 |          being produced each iteration, returning integer values denoting
 |          milliseconds.
 |      scheduler -- [Optional] Scheduler on which to run the generator loop.
 |          If not specified, the timeout scheduler is used.
 |      
 |      Returns the generated sequence.
 |  
 |  if_then(condition, then_source, else_source=None, scheduler=None) from abc.ABCMeta
 |      Determines whether an observable collection contains values.
 |      
 |      Example:
 |      1 - res = rx.Observable.if(condition, obs1)
 |      2 - res = rx.Observable.if(condition, obs1, obs2)
 |      3 - res = rx.Observable.if(condition, obs1, scheduler=scheduler)
 |      
 |      Keyword parameters:
 |      condition -- {Function} The condition which determines if the
 |          then_source or else_source will be run.
 |      then_source -- {Observable} The observable sequence or Promise that
 |          will be run if the condition function returns true.
 |      else_source -- {Observable} [Optional] The observable sequence or
 |          Promise that will be run if the condition function returns False.
 |          If this is not provided, it defaults to rx.Observable.empty
 |      scheduler -- [Optional] Scheduler to use.
 |      
 |      Returns an observable {Observable} sequence which is either the
 |      then_source or else_source.
 |  
 |  interval(period, scheduler=None) from abc.ABCMeta
 |      Returns an observable sequence that produces a value after each
 |      period.
 |      
 |      Example:
 |      1 - res = rx.Observable.interval(1000)
 |      2 - res = rx.Observable.interval(1000, rx.Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      period -- Period for producing the values in the resulting sequence
 |          (specified as an integer denoting milliseconds).
 |      scheduler -- [Optional] Scheduler to run the timer on. If not specified,
 |          rx.Scheduler.timeout is used.
 |      
 |      Returns an observable sequence that produces a value after each period.
 |  
 |  just = return_value(value, scheduler=None) from abc.ABCMeta
 |      Returns an observable sequence that contains a single element,
 |      using the specified scheduler to send out observer messages.
 |      There is an alias called 'just'.
 |      
 |      example
 |      res = rx.Observable.return(42)
 |      res = rx.Observable.return(42, rx.Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      value -- Single element in the resulting observable sequence.
 |      scheduler -- [Optional] Scheduler to send the single element on. If
 |          not specified, defaults to Scheduler.immediate.
 |      
 |      Returns an observable sequence containing the single specified
 |      element.
 |  
 |  merge(*args) from abc.ABCMeta
 |      Merges all the observable sequences into a single observable
 |      sequence. The scheduler is optional and if not specified, the
 |      immediate scheduler is used.
 |      
 |      1 - merged = rx.Observable.merge(xs, ys, zs)
 |      2 - merged = rx.Observable.merge([xs, ys, zs])
 |      3 - merged = rx.Observable.merge(scheduler, xs, ys, zs)
 |      4 - merged = rx.Observable.merge(scheduler, [xs, ys, zs])
 |      
 |      Returns the observable sequence that merges the elements of the
 |      observable sequences.
 |  
 |  never() from abc.ABCMeta
 |      Returns a non-terminating observable sequence, which can be used to
 |      denote an infinite duration (e.g. when using reactive joins).
 |      
 |      Returns an observable sequence whose observers will never get called.
 |  
 |  of(*args, **kwargs) from abc.ABCMeta
 |      This method creates a new Observable instance with a variable number
 |      of arguments, regardless of number or type of the arguments.
 |      
 |      Example:
 |      res = rx.Observable.of(1,2,3)
 |      
 |      Returns the observable sequence whose elements are pulled from the given
 |      arguments
 |  
 |  on_error_resume_next(*args) from abc.ABCMeta
 |      Continues an observable sequence that is terminated normally or by
 |      an exception with the next observable sequence.
 |      
 |      1 - res = Observable.on_error_resume_next(xs, ys, zs)
 |      2 - res = Observable.on_error_resume_next([xs, ys, zs])
 |      3 - res = Observable.on_error_resume_next(xs, factory)
 |      
 |      Returns an observable sequence that concatenates the source sequences,
 |      even if a sequence terminates exceptionally.
 |  
 |  range(start, count, scheduler=None) from abc.ABCMeta
 |      Generates an observable sequence of integral numbers within a
 |      specified range, using the specified scheduler to send out observer
 |      messages.
 |      
 |      1 - res = Rx.Observable.range(0, 10)
 |      2 - res = Rx.Observable.range(0, 10, rx.Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      start -- The value of the first integer in the sequence.
 |      count -- The number of sequential integers to generate.
 |      scheduler -- [Optional] Scheduler to run the generator loop on. If not
 |          specified, defaults to Scheduler.current_thread.
 |      
 |      Returns an observable sequence that contains a range of sequential
 |      integral numbers.
 |  
 |  repeat(value=None, repeat_count=None, scheduler=None) from abc.ABCMeta
 |      Generates an observable sequence that repeats the given element the
 |      specified number of times, using the specified scheduler to send out
 |      observer messages.
 |      
 |      1 - res = rx.Observable.repeat(42)
 |      2 - res = rx.Observable.repeat(42, 4)
 |      3 - res = rx.Observable.repeat(42, 4, Rx.Scheduler.timeout)
 |      4 - res = rx.Observable.repeat(42, None, Rx.Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      value -- Element to repeat.
 |      repeat_count -- [Optional] Number of times to repeat the element. If not
 |          specified, repeats indefinitely.
 |      scheduler -- Scheduler to run the producer loop on. If not specified,
 |          defaults to ImmediateScheduler.
 |      
 |      Returns an observable sequence that repeats the given element the
 |      specified number of times.
 |  
 |  return_value(value, scheduler=None) from abc.ABCMeta
 |      Returns an observable sequence that contains a single element,
 |      using the specified scheduler to send out observer messages.
 |      There is an alias called 'just'.
 |      
 |      example
 |      res = rx.Observable.return(42)
 |      res = rx.Observable.return(42, rx.Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      value -- Single element in the resulting observable sequence.
 |      scheduler -- [Optional] Scheduler to send the single element on. If
 |          not specified, defaults to Scheduler.immediate.
 |      
 |      Returns an observable sequence containing the single specified
 |      element.
 |  
 |  start(func, scheduler=None) from abc.ABCMeta
 |      Invokes the specified function asynchronously on the specified
 |      scheduler, surfacing the result through an observable sequence.
 |      
 |      Example:
 |      res = rx.Observable.start(lambda: pprint('hello'))
 |      res = rx.Observable.start(lambda: pprint('hello'), rx.Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      func -- {Function} Function to run asynchronously.
 |      scheduler -- {Scheduler} [Optional] Scheduler to run the function on. If
 |          not specified, defaults to Scheduler.timeout.
 |      
 |      Returns {Observable} An observable sequence exposing the function's
 |      result value, or an exception.
 |      
 |      Remarks:
 |      The function is called immediately, not during the subscription of the
 |      resulting sequence. Multiple subscriptions to the resulting sequence can
 |      observe the function's result.
 |  
 |  start_async(function_async) from abc.ABCMeta
 |      Invokes the asynchronous function, surfacing the result through an
 |      observable sequence.
 |      
 |      Keyword arguments:
 |      :param types.FunctionType function_async: Asynchronous function which
 |          returns a Future to run.
 |      
 |      :returns: An observable sequence exposing the function's result value, or an
 |          exception.
 |      :rtype: Observable
 |  
 |  switch_case = case(selector, sources, default_source=None, scheduler=None) from abc.ABCMeta
 |      Uses selector to determine which source in sources to use.
 |      There is an alias 'switch_case'.
 |      
 |      Example:
 |      1 - res = rx.Observable.case(selector, { '1': obs1, '2': obs2 })
 |      2 - res = rx.Observable.case(selector, { '1': obs1, '2': obs2 }, obs0)
 |      3 - res = rx.Observable.case(selector, { '1': obs1, '2': obs2 },
 |                                   scheduler=scheduler)
 |      
 |      Keyword arguments:
 |      :param types.FunctionType selector: The function which extracts the value
 |          for to test in a case statement.
 |      :param list sources: A object which has keys which correspond to the case
 |          statement labels.
 |      :param Observable default_source: The observable sequence or Promise that
 |          will be run if the sources are not matched. If this is not provided, it
 |          defaults to rx.Observabe.empty with the specified scheduler.
 |      
 |      :returns: An observable sequence which is determined by a case statement.
 |      :rtype: Observable
 |  
 |  throw(exception, scheduler=None) from abc.ABCMeta
 |      Returns an observable sequence that terminates with an exception,
 |      using the specified scheduler to send out the single OnError message.
 |      
 |      1 - res = rx.Observable.throw_exception(Exception('Error'))
 |      2 - res = rx.Observable.throw_exception(Exception('Error'),
 |                                              rx.Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      exception -- An object used for the sequence's termination.
 |      scheduler -- Scheduler to send the exceptional termination call on. If
 |          not specified, defaults to ImmediateScheduler.
 |      
 |      Returns the observable sequence that terminates exceptionally with the
 |      specified exception object.
 |  
 |  throw_exception = throw(exception, scheduler=None) from abc.ABCMeta
 |      Returns an observable sequence that terminates with an exception,
 |      using the specified scheduler to send out the single OnError message.
 |      
 |      1 - res = rx.Observable.throw_exception(Exception('Error'))
 |      2 - res = rx.Observable.throw_exception(Exception('Error'),
 |                                              rx.Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      exception -- An object used for the sequence's termination.
 |      scheduler -- Scheduler to send the exceptional termination call on. If
 |          not specified, defaults to ImmediateScheduler.
 |      
 |      Returns the observable sequence that terminates exceptionally with the
 |      specified exception object.
 |  
 |  timer(duetime, period=None, scheduler=None) from abc.ABCMeta
 |      Returns an observable sequence that produces a value after duetime
 |      has elapsed and then after each period.
 |      
 |      1 - res = Observable.timer(datetime(...))
 |      2 - res = Observable.timer(datetime(...), 1000)
 |      3 - res = Observable.timer(datetime(...), Scheduler.timeout)
 |      4 - res = Observable.timer(datetime(...), 1000, Scheduler.timeout)
 |      
 |      5 - res = Observable.timer(5000)
 |      6 - res = Observable.timer(5000, 1000)
 |      7 - res = Observable.timer(5000, scheduler=Scheduler.timeout)
 |      8 - res = Observable.timer(5000, 1000, Scheduler.timeout)
 |      
 |      Keyword arguments:
 |      duetime -- Absolute (specified as a Date object) or relative time
 |          (specified as an integer denoting milliseconds) at which to produce
 |          the first value.</param>
 |      period -- [Optional] Period to produce subsequent values (specified as
 |          an integer denoting milliseconds), or the scheduler to run the
 |          timer on. If not specified, the resulting timer is not recurring.
 |      scheduler -- [Optional] Scheduler to run the timer on. If not
 |          specified, the timeout scheduler is used.
 |      
 |      Returns an observable sequence that produces a value after due time has
 |      elapsed and then each period.
 |  
 |  to_async(func, scheduler=None) from abc.ABCMeta
 |      Converts the function into an asynchronous function. Each invocation
 |      of the resulting asynchronous function causes an invocation of the
 |      original synchronous function on the specified scheduler.
 |      
 |      Example:
 |      res = Observable.to_async(lambda x, y: x + y)(4, 3)
 |      res = Observable.to_async(lambda x, y: x + y, Scheduler.timeout)(4, 3)
 |      res = Observable.to_async(lambda x: log.debug(x),
 |                                Scheduler.timeout)('hello')
 |      
 |      func -- {Function} Function to convert to an asynchronous function.
 |      scheduler -- {Scheduler} [Optional] Scheduler to run the function on. If
 |          not specified, defaults to Scheduler.timeout.
 |      
 |      Returns {Function} Asynchronous function.
 |  
 |  using(resource_factory, observable_factory) from abc.ABCMeta
 |      Constructs an observable sequence that depends on a resource object,
 |      whose lifetime is tied to the resulting observable sequence's lifetime.
 |      
 |      1 - res = rx.Observable.using(lambda: AsyncSubject(), lambda: s: s)
 |      
 |      Keyword arguments:
 |      resource_factory -- Factory function to obtain a resource object.
 |      observable_factory -- Factory function to obtain an observable sequence
 |          that depends on the obtained resource.
 |      
 |      Returns an observable sequence whose lifetime controls the lifetime of
 |      the dependent resource object.
 |  
 |  when(*args) from abc.ABCMeta
 |      Joins together the results from several patterns.
 |      
 |      :param Observable cls: Observable class.
 |      :param list[Plan] args: A series of plans (specified as a list of as a
 |          series of arguments) created by use of the Then operator on patterns.
 |      :returns: Observable sequence with the results form matching several
 |          patterns.
 |      :rtype: Observable
 |  
 |  while_do(condition, source) from abc.ABCMeta
 |      Repeats source as long as condition holds emulating a while loop.
 |      
 |      Keyword arguments:
 |      :param types.FunctionType condition: The condition which determines if the
 |          source will be repeated.
 |      :param Observable source: The observable sequence that will be run if the
 |          condition function returns true.
 |      
 |      :returns: An observable sequence which is repeated as long as the condition
 |          holds.
 |      :rtype: Observable
 |  
 |  with_latest_from(*args) from abc.ABCMeta
 |      Merges the specified observable sequences into one observable sequence
 |      by using the selector function only when the first observable sequence
 |      produces an element. The observables can be passed either as seperate
 |      arguments or as a list.
 |      
 |      1 - obs = Observable.with_latest_from(obs1, obs2, obs3,
 |                                         lambda o1, o2, o3: o1 + o2 + o3)
 |      2 - obs = Observable.with_latest_from([obs1, obs2, obs3],
 |                                          lambda o1, o2, o3: o1 + o2 + o3)
 |      
 |      Returns an observable sequence containing the result of combining
 |      elements of the sources using the specified result selector function.
 |  
 |  zip(*args) from abc.ABCMeta
 |      Merges the specified observable sequences into one observable
 |      sequence by using the selector function whenever all of the observable
 |      sequences have produced an element at a corresponding index.
 |      
 |      The last element in the arguments must be a function to invoke for each
 |      series of elements at corresponding indexes in the sources.
 |      
 |      Arguments:
 |      args -- Observable sources.
 |      
 |      Returns an observable {Observable} sequence containing the result of
 |      combining elements of the sources using the specified result selector
 |      function.
 |  
 |  zip_array = zip_list(*args) from abc.ABCMeta
 |      Merge the specified observable sequences into one observable
 |      sequence by emitting a list with the elements of the observable
 |      sequences at corresponding indexes.
 |      
 |      Keyword arguments:
 |      :param Observable cls: Class
 |      :param Tuple args: Observable sources.
 |      
 |      :return: Returns an observable sequence containing lists of
 |      elements at corresponding indexes.
 |      :rtype: Observable
 |  
 |  zip_list(*args) from abc.ABCMeta
 |      Merge the specified observable sequences into one observable
 |      sequence by emitting a list with the elements of the observable
 |      sequences at corresponding indexes.
 |      
 |      Keyword arguments:
 |      :param Observable cls: Class
 |      :param Tuple args: Observable sources.
 |      
 |      :return: Returns an observable sequence containing lists of
 |      elements at corresponding indexes.
 |      :rtype: Observable
 |  
 |  ----------------------------------------------------------------------
 |  Data descriptors defined here:
 |  
 |  __dict__
 |      dictionary for instance variables (if defined)
 |  
 |  __weakref__
 |      list of weak references to the object (if defined)
 |  
 |  ----------------------------------------------------------------------
 |  Data and other attributes defined here:
 |  
 |  __abstractmethods__ = frozenset({'subscribe'})

Concepts

Rx is about processing streams of events. With Rx you:

  • Tell what you want to process (Observable)
  • How you want to process it (A composition of operators)
  • What you want to do with the result (Observer)

It's important to understand that with Rx you describe what you want to do with events if and when they arrive.

It's all a

  • Declarative composition of operators

  • Will do some processing on the events when they arrive.

  • If nothing happens, then nothing is processed.

Thus the pattern is that you subscribe to an Observable using an Observer:

subscription = Observable.subscribe(observer)

NOTE: Observables are not active in themselves. They need to be subscribed to make something happen. Simply having an Observable lying around doesn't make anything happen.

Generating a Sequence


In [8]:
class MyObserver(Observer):
    def on_next(self, x):
        print("Got: %s" % x)
    
    def on_error(self, e):
        print("Got ERROR: %s" % x)
    
    def on_completed(self):
        print("Sequence completed")

In [9]:
subject = Observable.from_iterable(range(10))

In [10]:
subject


Out[10]:
<rx.core.anonymousobservable.AnonymousObservable at 0x1bfebbd080>

In [11]:
observer = MyObserver()

In [12]:
observer


Out[12]:
<__main__.MyObserver at 0x1bfebbd160>

In [13]:
subject.subscribe(observer)


Got: 0
Got: 1
Got: 2
Got: 3
Got: 4
Got: 5
Got: 6
Got: 7
Got: 8
Got: 9
Sequence completed
Out[13]:
<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x1bfebbd2e8>

Concepts

  • The subscribe method takes an observer, or one to three callbacks for handing on_next(), on_error(), and on_completed().
  • This is why we can use print directly as the observer in the example above, since it becomes the on_next() handler for an anonymous observer

Interactive Rx - Generating


In [14]:
def interactiveObservable(count):
    print("-" * 30)
    
    sequence = range(count)
    subject = Observable.from_iterable(sequence)
    observer = MyObserver()
    subject.subscribe(observer)

In [15]:
countSlider = widgets.IntSlider(min=2, max=10, step=1, value=3)

interact(interactiveObservable, count=countSlider)


------------------------------
Got: 0
Got: 1
Got: 2
Sequence completed
Out[15]:
<function __main__.interactiveObservable>

In [16]:
from functools import partial

def interactiveSequenceObservable(observer, count):
    print("-" * 30)

    sequence = range(count)
    subject = Observable.from_iterable(sequence)
    subject.subscribe(observer)

Reference

Python Library: functools


In [17]:
countSlider = widgets.IntSlider(min=2, max=10, step=1, value=3)

callback = partial(interactiveSequenceObservable, MyObserver())
callback.__name__ = interactiveSequenceObservable.__name__

interact(callback, count=countSlider)


------------------------------
Got: 0
Got: 1
Got: 2
Sequence completed
Out[17]:
functools.partial(<function interactiveSequenceObservable at 0x0000001BFEBB6F28>, <__main__.MyObserver object at 0x0000001BFEBB0B38>)

In [18]:
countSlider = widgets.IntSlider(min=2, max=10, step=1, value=3)

callback = partial(interactiveSequenceObservable, print)
callback.__name__ = interactiveSequenceObservable.__name__

interact(callback, count=countSlider)


------------------------------
0
1
2
Out[18]:
functools.partial(<function interactiveSequenceObservable at 0x0000001BFEBB6F28>, <built-in function print>)

Filtering a Sequence


In [19]:
subject = Observable.from_iterable(range(10))
disposable = subject.filter(lambda x : x % 2).subscribe(print)
disposable


1
3
5
7
9
Out[19]:
<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x1bfebbda20>

Interactive Rx - Filtering


In [20]:
def interactiveFilter(observer, count, divisor):
    sequence = range(1, count+1)
    subject = Observable.from_iterable(sequence)
    filtered = subject.filter(lambda x: x % divisor)
    filtered.subscribe(observer)

In [21]:
callback = partial(interactiveFilter, print)
callback.__name__ = interactiveFilter.__name__

countSlider = widgets.IntSlider(min=2, max=50, value=10)
divisorSlider = widgets.IntSlider(min=2, max=10, value=2)

interact(callback, count=countSlider, divisor=divisorSlider)


1
3
5
7
9
Out[21]:
functools.partial(<function interactiveFilter at 0x0000001BFEBB6840>, <built-in function print>)

Transforming a Sequence


In [22]:
subject = Observable.from_iterable(range(10))
disposable = subject.map(lambda x : x * 2).subscribe(print)
disposable


0
2
4
6
8
10
12
14
16
18
Out[22]:
<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x1bfebdc4e0>

Interactive Rx - Transformations


In [23]:
def interactiveTransformer(observer, count, code, debug=False):
    sequence = range(1, count+1)
    if debug:
        print(sequence, tuple(sequence))
    subject = Observable.from_iterable(sequence)
    callback = eval(code)
    filtered = subject.map(callback)
    filtered.subscribe(observer)

In [24]:
callback = partial(interactiveTransformer, print)
callback.__name__ = interactiveTransformer.__name__

countSlider = widgets.IntSlider(min=2, max=50, value=10)
codeText = widgets.Text(value="lambda x: x % 2")
debugCheckBox = widgets.Checkbox(value=False)

interact(callback, count=countSlider, code=codeText, debug=debugCheckBox)


1
0
1
0
1
0
1
0
1
0
Out[24]:
functools.partial(<function interactiveTransformer at 0x0000001BFEBE7950>, <built-in function print>)

In [25]:
codeText = widgets.Text(value="lambda x: x % 2")

interact(callback, count=countSlider, code=codeText, debug=debugCheckBox)


1
0
1
0
1
0
1
0
1
0
Out[25]:
functools.partial(<function interactiveTransformer at 0x0000001BFEBE7950>, <built-in function print>)

Merge


In [26]:
xs = Observable.from_iterable(range(1, 6))
ys = Observable.from_iterable('abcde')
xsMys = xs.merge(ys)
xsMys.subscribe(print)


a
1
b
2
c
3
d
4
e
5
Out[26]:
<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x1bfebbd4e0>

Space-Time of rx

Marbles and Marble Diagrams

- = Timespan of 100 ms

x = on_error()

| = on_completed()


In [27]:
from rx.testing import marbles

xs = Observable.from_marbles('a-b-c-|')

xs.to_blocking().to_marbles()


Out[27]:
'a-b-c-|'

In [28]:
xs = Observable.from_marbles('a-b-c-x-e')
ys = Observable.from_marbles('1-2-3-4-5')
xs.merge(ys).to_blocking().to_marbles()


Out[28]:
'1a-2b-3c-4x'

In [29]:
xs = Observable.from_marbles('1-2-3-x-4')
ys = Observable.from_marbles('1-2-3-4-5')
xs.merge(ys).to_blocking().to_marbles()


Out[29]:
'11-22-33-4x'

Note

Note the -x and -4x. The xs and ys stream items may come in any order

Interactive Rx - Marbles


In [30]:
def interactiveMarbles(stream1, stream2):
    xs = Observable.from_marbles(stream1)
    ys = Observable.from_marbles(stream2)
    print(xs.merge(ys).to_blocking().to_marbles())

In [31]:
stream1Marble = widgets.Text(value="1-2-3-4-5")
stream2Marble = widgets.Text(value="a-b-c-d-e")

interact(interactiveMarbles, stream1=stream1Marble, stream2=stream2Marble)


a1-b2-3c-4x

Python Sandbox


In [32]:
result = eval('lambda x: x * 2')
result
tuple(map(result, (1, 2, 3, 4)))


Out[32]:
(2, 4, 6, 8)