In [1]:
%run startup.py

In [2]:
%%javascript
$.getScript('./assets/js/ipython_notebook_toc.js')


A Decision Tree of Observable Operators

Part 7: Meta Operations

source: http://reactivex.io/documentation/operators.html#tree.
(transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, axiros)

This tree can help you find the ReactiveX Observable operator you’re looking for.
See Part 1 for Usage and Output Instructions.

We also require acquaintance with the marble diagrams feature of RxPy.

Table of Contents

I want to convert the entire sequence of items emitted by an Observable into some other data structure to_iterable/to_list, to_blocking, to_dict, to_future, to_marbles, to_set


In [3]:
rst(O.to_iterable)
s = marble_stream("a--b-c|")
l, ts = [], time.time()
def on_next(listed):
    print('got', listed, time.time()-ts)

for i in (1, 2):    
    d = s.subscribe(on_next)
    # second run: only one value, the list.
    s = s.to_list()
    # both are started around same time -> check time deltas



========== to_list ==========

module rx.linq.observable.tolist
@extensionmethod(Observable, alias="to_iterable")
def to_list(self):
    Creates a list from an observable sequence.

    Returns an observable sequence containing a single element with a list
    containing all the elements of the source sequence.
--------------------------------------------------------------------------------
('got', 'a', 0.011667013168334961)
('got', 'b', 0.22758698463439941)
('got', 'c', 0.335313081741333)
('got', ['a', 'b', 'c'], 0.3470189571380615)

In [4]:
rst(O.to_blocking)
ts = time.time()
s = O.interval(200).take(3)
sb = s.to_blocking()
# this is instant:
assert time.time() - ts < 0.2

print('''In some implementations of ReactiveX, there is also an operator that converts an Observable into a “Blocking” Observable. A Blocking Observable extends the ordinary Observable by providing a set of methods, operating on the items emitted by the Observable, that block. Some of the To operators are in this Blocking Obsevable set of extended operations.''')
# -> diffing dir(s) with dir(sb) we get:
# __iter__
# for_each
# observable
rst(sb.__iter__)
for i in (1, 2):
    # not interleaved results:
    for it in sb:
        log(it)

rst(sb.for_each)        
sb.for_each(log)

header(".observable -> getting async again")
# interleaved again:
d = subs(sb.observable, name='observer 1')
d = subs(sb.observable, name='observer 2')



========== to_blocking ==========

module rx.linq.observable.toblocking
@extensionmethod(ObservableBase)
def to_blocking(self):
    n.a.
--------------------------------------------------------------------------------
In some implementations of ReactiveX, there is also an operator that converts an Observable into a “Blocking” Observable. A Blocking Observable extends the ordinary Observable by providing a set of methods, operating on the items emitted by the Observable, that block. Some of the To operators are in this Blocking Obsevable set of extended operations.


========== __iter__ ==========

module rx.linq.observable.blocking.toiterable
@extensionmethod(BlockingObservable)
def __iter__(self):
    Returns an iterator that can iterate over items emitted by this
    `BlockingObservable`.

    :param BlockingObservable self: Blocking observable instance.
    :returns: An iterator that can iterate over the items emitted by this
        `BlockingObservable`.
    :rtype: Iterable[Any]
--------------------------------------------------------------------------------
 204.0     M 0
 408.9     M 1
 615.1     M 2
 822.2     M 0
1027.2     M 1
1230.1     M 2


========== for_each ==========

module rx.linq.observable.blocking.foreach
@extensionmethod(BlockingObservable)
def for_each(self, action):
    Invokes a method on each item emitted by this BlockingObservable and
    blocks until the Observable completes.

    Note: This will block even if the underlying Observable is asynchronous.

    This is similar to Observable#subscribe(subscriber), but it blocks. Because
    it blocks it does not need the Subscriber#on_completed() or
    Subscriber#on_error(Throwable) methods. If the underlying Observable
    terminates with an error, rather than calling `onError`, this method will
    throw an exception.

    Keyword arguments:
    :param types.FunctionType action: the action to invoke for each item
        emitted by the `BlockingObservable`.
    :raises Exception: if an error occurs
    :returns: None
    :rtype: None
--------------------------------------------------------------------------------
 205.5   T28 0
 408.5   T29 1
 613.9   T30 2


========== .observable -> getting async again ==========


 615.9     M New subscription (observer 1) on stream 276608405

 617.6     M New subscription (observer 2) on stream 276608405
 821.8   T33 [next]  204.2: 0 -> observer 2   822.6   T32 [next]  206.4: 0 -> observer 1  

1025.0   T34 [next]  407.4: 1 -> observer 2  1024.8   T35 [next]  408.6: 1 -> observer 1  

1227.7   T36 [next]  610.1: 2 -> observer 2  1229.0   T37 [next]  612.8: 2 -> observer 1  

1229.4   T37 [cmpl]  613.2: fin -> observer 1  1228.5   T36 [cmpl]  610.9: fin -> observer 2  


In [5]:
rst(O.to_dict)
d = subs(O.from_('abc').to_dict(key_mapper=lambda x: x, element_mapper=lambda a: '%s%s' % (a, a)))



========== to_dict ==========

module rx.linq.observable.todict
@extensionmethod(ObservableBase)
to_dict(self, key_mapper, element_mapper=None)
    Converts the observable sequence to a Map if it exists.

    Keyword arguments:
    key_mapper -- {Function} A function which produces the key for the
        Map.
    element_mapper -- {Function} [Optional] An optional function which
        produces the element for the Map. If not present, defaults to the
        value from the observable sequence.
    Returns {Observable} An observable sequence with a single value of a Map
    containing the values from the observable sequence.
--------------------------------------------------------------------------------

   1.6     M New subscription on stream 276526817
   2.6     M [next]    0.7: {'a': 'aa', 'c': 'cc', 'b': 'bb'}
   2.8     M [cmpl]    0.8: fin

In [29]:
rst(O.to_future)
def emit(obs):    
    for ev in 'first', 'second':
        sleep(.5)
        log('emitting', ev)
        obs.on_next(ev)
    # vital for the future to get done:
    obs.on_completed()
    
    
try:
    # required for py2 (backport of guidos' tulip stuffs, now asyncio)
    # caution: people say this is not production ready and will never be.
    import trollius
    f = rx.Observable.create(emit).to_future(trollius.Future)
    # this is async, not a busy loop
    log('future.result():', f.result())
except: # notebook should always run all cells
    print ('skipping this; pip install trollius required')



========== to_future ==========

module rx.linq.observable.tofuture
@extensionmethod(ObservableBase)
def to_future(self, future_ctor=None):
    Converts an existing observable sequence to a Future

    Example:
    future = rx.return_value(42).to_future(trollius.Future);

    With config:
    rx.config["Future"] = trollius.Future
    future = rx.return_value(42).to_future()

    future_ctor -- {Function} [Optional] The constructor of the future.
        If not provided, it looks for it in rx.config.Future.

    Returns {Future} An future with the last value from the observable
    sequence.
--------------------------------------------------------------------------------
 506.5     M emitting first
1008.0     M emitting second
1008.7     M future.result(): second

In [34]:
rst(O.from_marbles)
d = subs(rx.Observable.from_string("1-(42)-3-|").to_blocking())



========== from_marbles ==========

module rx.testing.marbles
@extensionclassmethod(Observable, alias="from_string")
def from_marbles(cls, string, scheduler=None):
    Convert a marble diagram string to an observable sequence, using
    an optional scheduler to enumerate the events.

    Special characters:
    - = Timespan of 100 ms
    x = on_error()
    | = on_completed()

    All other characters are treated as an on_next() event at the given
    moment they are found on the string.

    Examples:
    1 - res = rx.Observable.from_string("1-2-3-|")
    2 - res = rx.Observable.from_string("1-(42)-3-|")
    3 - res = rx.Observable.from_string("1-2-3-x", rx.Scheduler.timeout)

    Keyword arguments:
    string -- String with marble diagram
    scheduler -- [Optional] Scheduler to run the the input sequence on.

    Returns the observable sequence whose elements are pulled from the
    given marble diagram string.
--------------------------------------------------------------------------------

   4.2     M New subscription on stream 276651201
  16.2   T21 [next]   12.0: 1
 127.7   T22 [next]  123.5: 42
 236.9   T25 [next]  232.7: 3
 347.0   T27 [cmpl]  342.7: fin

In [40]:
rst(O.to_set)
d = subs(O.from_("abcabc").to_set())



========== to_set ==========

module rx.linq.observable.toset
@extensionmethod(ObservableBase)
def to_set(self):
    Converts the observable sequence to a set.

    Returns {Observable} An observable sequence with a single value of a set
    containing the values from the observable sequence.
--------------------------------------------------------------------------------

   1.3     M New subscription on stream 276654145
   3.0     M [next]    1.5: set(['a', 'c', 'b'])
   3.4     M [cmpl]    1.9: fin

I want an operator to operate on a particular Scheduler: subscribe_on

Advanced feature: Adding side effects to subscription and unsubscription events.

This is a good read:

Plus see the other links on RX docu


In [54]:
rst(O.subscribe_on)

# start simple:
header('Switching Schedulers')
s = O.just(42, rx.scheduler.ImmediateScheduler())
d = subs(s.subscribe_on(rx.scheduler.TimeoutScheduler()), name='SimpleSubs')

sleep(0.1)

header('Custom Subscription Side Effects')

from rx.scheduler.newthreadscheduler import NewThreadScheduler
from rx.scheduler.eventloopscheduler import EventLoopScheduler

class MySched(NewThreadScheduler):
    '''For adding side effects at subscription and unsubscription time'''
    def schedule(self, action, state=None):
        log('new scheduling task', action)
        scheduler = EventLoopScheduler(
            thread_factory=self.thread_factory,
            exit_if_empty=True)
        return scheduler.schedule(action, state)
                
s = O.interval(200).take(2)
s = s.subscribe_on(MySched())
d = subs(s, name="subs1")
d = subs(s, name="subs2")



========== subscribe_on ==========

module rx.linq.observable.subscribeon
@extensionmethod(ObservableBase)
def subscribe_on(self, scheduler):
    Subscribe on the specified scheduler.

    Wrap the source sequence in order to run its subscription and
    unsubscription logic on the specified scheduler. This operation is not
    commonly used; see the remarks section for more information on the
    distinction between subscribe_on and observe_on.

    Keyword arguments:
    scheduler -- Scheduler to perform subscription and unsubscription
        actions on.

    Returns the source sequence whose subscriptions and unsubscriptions
    happen on the specified scheduler.

    This only performs the side-effects of subscription and unsubscription
    on the specified scheduler. In order to invoke observer callbacks on a
    scheduler, use observe_on.
--------------------------------------------------------------------------------


========== Switching Schedulers ==========

   3.6  T451 [next]    0.9: 42 (SimpleSubs)

   2.6     M New subscription on stream 276546517   3.8  T451 [cmpl]    1.2: fin (SimpleSubs)



========== Custom Subscription Side Effects ==========


 106.6     M New subscription on stream 276546553
 106.9     M new scheduling task <function action at 0x107fad5f0>

 107.9     M New subscription on stream 276546553
 108.5     M new scheduling task <function action at 0x107b82320>
 310.2  T454 [next]  203.5: 0 (subs1)
 311.6  T456 [next]  203.4: 0 (subs2)
 513.5  T457 [next]  406.8: 1 (subs1)
 515.4  T458 [next]  407.3: 1 (subs2) 513.8  T457 [cmpl]  407.2: fin (subs1)
 516.2  T458 [cmpl]  408.1: fin (subs2)

...when it notifies observers observe_on

Via this you can add side effects on any notification to any subscriber.

This example shall demonstrate whats going on:


In [44]:
rst(O.observe_on)
from rx.scheduler.newthreadscheduler import NewThreadScheduler

header('defining a custom thread factory for a custom scheduler')
def my_thread_factory(target, args=None): 
    'just to show that also here we can customize'
    t = threading.Thread(target=target, args=args or [])
    t.setDaemon(True)
    print ('\ncreated %s\n' % t.getName())
    return t


class MySched:
    def __init__(self):
        self.rx_sched = NewThreadScheduler(my_thread_factory)
        
    def __getattr__(self, a):
        'called whenever the observe_on scheduler is on duty'
        log('RX called', a, 'on mysched\n')
        return getattr(self.rx_sched, a)
        
mysched = MySched()        
s = O.interval(200).take(3) #.delay(100, mysched)

d = subs(s.observe_on(mysched))

sleep(2)
print 'all threads after finish:' # all cleaned up
print (' '.join([t.name for t in threading.enumerate()]))



========== observe_on ==========

module rx.linq.observable.observeon
@extensionmethod(ObservableBase)
def observe_on(self, scheduler):
    Wraps the source sequence in order to run its observer callbacks on
    the specified scheduler.

    Keyword arguments:
    scheduler -- Scheduler to notify observers on.

    Returns the source sequence whose observations happen on the specified
    scheduler.

    This only invokes observer callbacks on a scheduler. In case the
    subscription and/or unsubscription actions have side-effects
    that require to be run on a scheduler, use subscribe_on.
--------------------------------------------------------------------------------


========== defining a custom thread factory for a custom scheduler ==========


   2.8     M New subscription on stream 276529577
 208.2  T392 RX called schedule on mysched


created Thread-393

 209.5  T393 [next]  206.4: 0
 210.2  T393 RX called schedule on mysched


created Thread-395

 414.0  T394 RX called schedule on mysched
 415.5  T396 [next]  412.4: 1


created Thread-396
 416.2  T396 RX called schedule on mysched



created Thread-398

 619.0  T397 RX called schedule on mysched
 620.3  T399 [next]  617.3: 2


created Thread-399
 620.5  T399 RX called schedule on mysched


 622.4  T400 [cmpl]  619.4: fin
created Thread-400


 622.9  T400 RX called schedule on mysched


created Thread-402

all threads after finish:
MainThread Thread-2 IPythonHistorySavingThread Thread-1 Thread-3

I want an Observable to invoke a particular action when certain events occur do_action/tap, finally_action


In [66]:
rst(O.do_action)
def say(v=None):
    if v:
       log('NI!', v)
    else:
       log('EOF')
    
d = subs(O.range(10, 10).take(2).tap(say, on_completed=say))



========== do_action ==========

module rx.linq.observable.doaction
@extensionmethod(Observable, alias="tap")
def do_action(self, on_next=None, on_error=None, on_completed=None,
    Invokes an action for each element in the observable sequence and
    invokes an action upon graceful or exceptional termination of the
    observable sequence. This method can be used for debugging, logging,
    etc. of query behavior by intercepting the message stream to run
    arbitrary actions for messages on the pipeline.

    1 - observable.do_action(observer)
    2 - observable.do_action(on_next)
    3 - observable.do_action(on_next, on_error)
    4 - observable.do_action(on_next, on_error, on_completed)

    observer -- [Optional] Observer, or ...
    on_next -- [Optional] Action to invoke for each element in the
        observable sequence.
    on_error -- [Optional] Action to invoke upon exceptional termination
        of the observable sequence.
    on_completed -- [Optional] Action to invoke upon graceful termination
        of the observable sequence.

    Returns the source sequence with the side-effecting behavior applied.
--------------------------------------------------------------------------------

   3.4     M New subscription on stream 276579277
   4.1     M NI! 10
   4.5     M [next]    1.0: 10
   4.7     M NI! 11
   4.9     M [next]    1.5: 11
   5.1     M EOF
   5.2     M [cmpl]    1.8: fin

In [72]:
rst(O.finally_action)
d = subs(O.on_error('err').take(2).finally_action(say))



========== finally_action ==========

module rx.linq.observable.finallyaction
@extensionmethod(ObservableBase)
def finally_action(self, action):
    Invokes a specified action after the source observable sequence
    terminates gracefully or exceptionally.

    Example:
    res = observable.finally(lambda: print('sequence ended')

    Keyword arguments:
    action -- {Function} Action to invoke after the source observable
        sequence terminates.
    Returns {Observable} Source sequence with the action-invoking
    termination behavior applied.
--------------------------------------------------------------------------------

   2.2     M New subscription on stream 276553221
   3.0     M [err ]    0.6: err
   3.5     M EOF

I want an Observable that will notify observers of an error throw**


In [76]:
rst(O.throw)
d = subs(O.range(1, 3).concat(O.on_error("ups")))



========== throw ==========

module rx.linq.observable.throw
@extensionclassmethod(Observable, alias="throw_exception")
def on_error(cls, exception, scheduler=None):
    Returns an observable sequence that terminates with an exception,
    using the specified scheduler to send out the single OnError message.

    1 - res = rx.throw(Exception('Error'))
    2 - res = rx.throw(Exception('Error'),
                                            rx.Scheduler.timeout)

    Keyword arguments:
    exception -- An object used for the sequence's termination.
    scheduler -- Scheduler to send the exceptional termination call on. If
        not specified, defaults to ImmediateScheduler.

    Returns the observable sequence that terminates exceptionally with the
    specified exception object.
--------------------------------------------------------------------------------

   1.9     M New subscription on stream 276578025
   2.6     M [next]    0.6: 1
   3.0     M [next]    0.9: 2
   3.3     M [next]    1.2: 3
   3.6     M [err ]    1.5: ups

...if a specified period of time elapses without it emitting an item timeout / timeout_with_selector**


In [85]:
rst(O.timeout)
d = subs(marble_stream("a-b---c|").timeout(200, O.just('timeout')))
# this also works with absolute time. See docstring:



========== timeout ==========

module rx.linq.observable.timeout
@extensionmethod(ObservableBase)
def timeout(self, duetime, other=None, scheduler=None):
    Returns the source observable sequence or the other observable
    sequence if duetime elapses.

    1 - res = source.timeout(new Date()); # As a date
    2 - res = source.timeout(5000); # 5 seconds
    # As a date and timeout observable
    3 - res = source.timeout(datetime(), rx.return_value(42))
    # 5 seconds and timeout observable
    4 - res = source.timeout(5000, rx.return_value(42))
    # As a date and timeout observable
    5 - res = source.timeout(datetime(), rx.return_value(42),
                             rx.Scheduler.timeout)
    # 5 seconds and timeout observable
    6 - res = source.timeout(5000, rx.return_value(42),
                             rx.Scheduler.timeout)

    Keyword arguments:
    :param datetime|int duetime: Absolute (specified as a datetime object) or
        relative time (specified as an integer denoting milliseconds) when a
        timeout occurs.
    :param Observable other: Sequence to return in case of a timeout. If not
        specified, a timeout error throwing sequence will be used.
    :param Scheduler scheduler: Scheduler to run the timeout timers on. If not
        specified, the timeout scheduler is used.

    :returns: The source sequence switching to the other sequence in case of
        a timeout.
    :rtype: Observable
--------------------------------------------------------------------------------

   4.2     M New subscription on stream 276577449
  17.1  T521 [next]   12.8: a
 127.8  T522 [next]  123.5: b
 332.7  T530 [next]  328.4: timeout
 333.0  T530 [cmpl]  328.7: fin

In [11]:
rst(O.timeout_with_selector)
d = subs(marble_stream("2-2-1-1|")\
         .timeout_with_selector(
              # you get the value and can adjust the timeout accordingly:
              timeout_duration_mapper=lambda x: O.timer(100 * int(x)),
              other=O.just('timeout')))



========== timeout_with_selector ==========

module rx.linq.observable.timeoutwithselector
@extensionmethod(ObservableBase)
def timeout_with_selector(self, first_timeout=None,
                          timeout_duration_mapper=None, other=None):
    Returns the source observable sequence, switching to the other
    observable sequence if a timeout is signaled.

    1 - res = source.timeout_with_selector(rx.Observable.timer(500))
    2 - res = source.timeout_with_selector(rx.Observable.timer(500),
                lambda x: rx.Observable.timer(200))
    3 - res = source.timeout_with_selector(rx.Observable.timer(500),
                lambda x: rx.Observable.timer(200)),
                rx.return_value(42))

    first_timeout -- [Optional] Observable sequence that represents the
        timeout for the first element. If not provided, this defaults to
        rx.never().
    timeout_Duration_mapper -- [Optional] Selector to retrieve an
        observable sequence that represents the timeout between the current
        element and the next element.
    other -- [Optional] Sequence to return in case of a timeout. If not
        provided, this is set to rx.throw().

    Returns the source sequence switching to the other sequence in case of
    a timeout.
--------------------------------------------------------------------------------

   3.9     M New subscription on stream 276596365
  17.0   T74 [next]   12.9: 2
 126.1   T76 [next]  122.1: 2
 237.1   T77 [next]  233.1: 1
 341.6   T86 [next]  337.5: timeout
 341.9   T86 [cmpl]  337.8: fin

I want an Observable to recover gracefully

...from a timeout by switching to a backup Observable timeout / timeout_with_selector**

(example: see above)

...from an upstream error notification catch_exception, on_error_resume_next**


In [20]:
rst(O.catch_exception)
fubar1 = O.on_error('Ups')
fubar2 = O.on_error('Argh')
good = O.just(42)
d = subs(O.catch(fubar1, fubar2, good))


rst(O.on_error_resume_next)

bucket = [0]
def emitter(obs):
    v = bucket[-1]
    bucket.append(v)
    for i in range(0, len(bucket) + 2):
        obs.on_next(i)
    if len(bucket) > 2:
        log('notify error')
        obs.on_error("ups")
    log('notify complete')
    obs.on_completed()
    
    
    
d = subs(O.on_error_resume_next(O.just('running'),
                                O.create(emitter),
                                O.create(emitter),
                                O.just('all good')
                               ))



========== catch_exception ==========

module rx.linq.observable.catch
@extensionclassmethod(Observable)
def catch(cls, *args):
    Continues an observable sequence that is terminated by an
    exception with the next observable sequence.

    1 - res = Observable.catch(xs, ys, zs)
    2 - res = Observable.catch([xs, ys, zs])

    Returns an observable sequence containing elements from consecutive
    source sequences until a source sequence terminates successfully.
--------------------------------------------------------------------------------

   3.5     M New subscription on stream 276515289
   4.5     M [next]    0.8: 42
   4.6     M [cmpl]    1.0: fin


========== on_error_resume_next ==========

module rx.linq.observable.onerrorresumenext
@extensionclassmethod(Observable)
def on_error_resume_next(cls, *args):
    Continues an observable sequence that is terminated normally or by
    an exception with the next observable sequence.

    1 - res = Observable.on_error_resume_next(xs, ys, zs)
    2 - res = Observable.on_error_resume_next([xs, ys, zs])
    3 - res = Observable.on_error_resume_next(xs, factory)

    Returns an observable sequence that concatenates the source sequences,
    even if a sequence terminates exceptionally.
--------------------------------------------------------------------------------

   2.2     M New subscription on stream 276515085
   2.6     M [next]    0.3: running
   2.8     M [next]    0.5: 0
   3.0     M [next]    0.7: 1
   3.1     M [next]    0.8: 2
   3.2     M [next]    0.9: 3
   3.3     M notify complete
   3.8     M [next]    1.6: 0
   4.3     M [next]    2.0: 1
   4.5     M [next]    2.2: 2
   4.7     M [next]    2.4: 3
   5.0     M [next]    2.7: 4
   5.2     M notify error
   5.4     M notify complete
   6.0     M [next]    3.8: all good
   6.4     M [cmpl]    4.2: fin

... by attempting to resubscribe to the upstream Observable retry


In [25]:
rst(O.retry)
ts = time.time()
def emit(obs):
    dt = time.time() - ts
    obs.on_next('try %s' % dt)
    if dt < 1:
        sleep(0.2)
        log('error')
        obs.on_error('ups')
    obs.on_completed()
    
d = subs(O.create(emit).retry(10))



========== retry ==========

module rx.linq.observable.retry
@extensionmethod(ObservableBase)
def retry(self, retry_count=None):
    Repeats the source observable sequence the specified number of times
    or until it successfully terminates. If the retry count is not
    specified, it retries indefinitely.

    1 - retried = retry.repeat()
    2 - retried = retry.repeat(42)

    retry_count -- [Optional] Number of times to retry the sequence. If not
    provided, retry the sequence indefinitely.

    Returns an observable sequence producing the elements of the given
    sequence repeatedly until it terminates successfully.
--------------------------------------------------------------------------------

   1.4     M New subscription on stream 276578989
   1.8     M [next]    0.3: try 0.00107383728027
 204.6     M error
 205.8     M [next]  204.3: try 0.205071926117
 408.9     M error
 410.7     M [next]  409.2: try 0.409952878952
 613.3     M error
 614.7     M [next]  613.2: try 0.613961935043
 819.0     M error
 820.0     M [next]  818.4: try 0.819205999374
1024.7     M error
1025.2     M [next] 1023.7: try 1.02444982529
1025.4     M [cmpl] 1023.9: fin

I want to create a resource that has the same lifespan as the Observable using

http://www.introtorx.com/Content/v1.0.10621.0/11_AdvancedErrorHandling.html#Using:

The Using factory method allows you to bind the lifetime of a resource to the lifetime of an observable sequence. The signature itself takes two factory methods; one to provide the resource and one to provide the sequence. This allows everything to be lazily evaluated.

This mechanism can find varied practical applications in the hands of an imaginative developer. The resource being an IDisposable is convenient; indeed, it makes it so that many types of resources can be bound to, such as other subscriptions, stream reader/writers, database connections, user controls and, with Disposable(Action), virtually anything else.


In [13]:
rst(O.using)
#d = subs(O.interval(1000).take(2))

lifetime = 2000
def result(disposable_resource_fac):
    return O.just(disposable_resource_fac).delay(lifetime)

d2 = subs(O.using(lambda: subs(O.interval(100).take(1000), name='resource fac\n'),
                  result), name='outer stream\n')



========== using ==========

module rx.linq.observable.using
@extensionclassmethod(Observable)
def using(cls, resource_factory, observable_factory):
    Constructs an observable sequence that depends on a resource object,
    whose lifetime is tied to the resulting observable sequence's lifetime.

    1 - res = rx.Observable.using(lambda: AsyncSubject(), lambda: s: s)

    Keyword arguments:
    resource_factory -- Factory function to obtain a resource object.
    observable_factory -- Factory function to obtain an observable sequence
        that depends on the obtained resource.

    Returns an observable sequence whose lifetime controls the lifetime of
    the dependent resource object.
--------------------------------------------------------------------------------

   2.2     M New subscription (outer stream) on stream 276622297

   2.6     M New subscription (resource fac) on stream 276622093
 106.8  T127 [next]  104.1: 0 -> resource fac
  
 213.1  T129 [next]  210.5: 1 -> resource fac
  
 318.8  T130 [next]  316.2: 2 -> resource fac
  
 420.2  T131 [next]  417.5: 3 -> resource fac
  
 521.3  T132 [next]  518.7: 4 -> resource fac
  
 622.3  T133 [next]  619.7: 5 -> resource fac
  
 724.3  T134 [next]  721.7: 6 -> resource fac
  
 828.5  T135 [next]  825.8: 7 -> resource fac
  
 929.4  T136 [next]  926.7: 8 -> resource fac
  
1031.6  T137 [next] 1028.9: 9 -> resource fac
  
1133.8  T138 [next] 1131.1: 10 -> resource fac
  
1239.0  T139 [next] 1236.4: 11 -> resource fac
  
1342.8  T140 [next] 1340.1: 12 -> resource fac
  
1446.2  T141 [next] 1443.5: 13 -> resource fac
  
1550.1  T142 [next] 1547.5: 14 -> resource fac
  
1655.1  T143 [next] 1652.4: 15 -> resource fac
  
1757.7  T144 [next] 1755.0: 16 -> resource fac
  
1858.7  T145 [next] 1856.1: 17 -> resource fac
  
1961.2  T146 [next] 1958.5: 18 -> resource fac
  
2005.6  T128 [next] 2003.4: <rx.disposable.Disposable.Disposable object at 0x107d04450> -> outer stream
  
2006.0  T128 [cmpl] 2003.7: fin -> outer stream
  

I want to subscribe to an Observable and receive a Future that blocks until the Observable completes start, start_async, to_async


In [6]:
rst(O.start)
def starter():
    # called only once, async:
    return 'start: ', time.time()
s = O.start(starter).concat(O.from_('abc'))
d = subs(s, name='sub1')
d = subs(s, name='sub2')



========== start ==========

module rx.linq.observable.start
@extensionclassmethod(Observable)
def start(cls, func, scheduler=None):
    Invokes the specified function asynchronously on the specified
    scheduler, surfacing the result through an observable sequence.

    Example:
    res = rx.Observable.start(lambda: pprint('hello'))
    res = rx.Observable.start(lambda: pprint('hello'), rx.Scheduler.timeout)

    Keyword arguments:
    func -- {Function} Function to run asynchronously.
    scheduler -- {Scheduler} [Optional] Scheduler to run the function on. If
        not specified, defaults to Scheduler.timeout.

    Returns {Observable} An observable sequence exposing the function's
    result value, or an exception.

    Remarks:
    The function is called immediately, not during the subscription of the
    resulting sequence. Multiple subscriptions to the resulting sequence can
    observe the function's result.
--------------------------------------------------------------------------------

   2.1     M New subscription (sub1) on stream 276594277
   2.6     M [next]    0.2: ('start: ', 1482383023.989834) -> sub1  
   2.9     M [next]    0.6: a -> sub1  
   3.1     M [next]    0.7: b -> sub1  
   3.2     M [next]    0.8: c -> sub1  
   3.5     M [cmpl]    1.1: fin -> sub1  

   3.9     M New subscription (sub2) on stream 276594277
   4.2     M [next]    0.2: ('start: ', 1482383023.989834) -> sub2  
   4.6     M [next]    0.6: a -> sub2  
   4.9     M [next]    1.0: b -> sub2  
   5.1     M [next]    1.2: c -> sub2  
   5.3     M [cmpl]    1.3: fin -> sub2  

In [23]:
rst(O.start_async)

def emit(obs):
    
    for ev in 'first', 'second':
        sleep(.2)
        log('emitting', ev)
        obs.on_next(ev)
    # vital for the future to get done:
    obs.on_completed()
    
def future():
    # only called once:
    log('called future')
    future = trollius.Future()
    future.set_result(('42', time.time()))
    future.set_exception(Exception('ups'))
    return future
    
try:
    # required for py2 (backport of guidos' tulip stuffs, now asyncio)
    # caution: people say this is not production ready and will never be.
    import trollius
    s = O.start_async(future)
    d = subs(s, name='subs1')
    # same result:
    d = subs(s, name='subs2')
except Exception as ex: # notebook should always run all cells
    print ('%s skipping this; pip install trollius required' % ex)



========== start_async ==========

module rx.linq.observable.startasync
@extensionclassmethod(Observable)
def start_async(cls, function_async):
    Invokes the asynchronous function, surfacing the result through an
    observable sequence.

    Keyword arguments:
    :param types.FunctionType function_async: Asynchronous function which
        returns a Future to run.

    :returns: An observable sequence exposing the function's result value, or an
        exception.
    :rtype: Observable
--------------------------------------------------------------------------------
   1.8     M called future

   2.1     M New subscription (subs1) on stream 276008249
   2.9     M [err ]    0.2: FINISHED: <Future finished result=('42', 1482384109.227631)> -> subs1  

   3.5     M New subscription (subs2) on stream 276008249
   3.9     M [err ]    0.1: FINISHED: <Future finished result=('42', 1482384109.227631)> -> subs2  

In [25]:
rst(O.to_async)
d = subs(O.to_async(lambda x, y: x + y)(4, 3) )



========== to_async ==========

module rx.linq.observable.toasync
@extensionclassmethod(Observable)
def to_async(cls, func, scheduler=None):
    Converts the function into an asynchronous function. Each invocation
    of the resulting asynchronous function causes an invocation of the
    original synchronous function on the specified scheduler.

    Example:
    res = Observable.to_async(lambda x, y: x + y)(4, 3)
    res = Observable.to_async(lambda x, y: x + y, Scheduler.timeout)(4, 3)
    res = Observable.to_async(lambda x: log.debug(x),
                              Scheduler.timeout)('hello')

    func -- {Function} Function to convert to an asynchronous function.
    scheduler -- {Scheduler} [Optional] Scheduler to run the function on. If
        not specified, defaults to Scheduler.timeout.

    Returns {Function} Asynchronous function.
--------------------------------------------------------------------------------

   2.9     M New subscription (9223) on stream 276564689
   3.4     M [next]    0.4: 7 -> 9223  
   3.6     M [cmpl]    0.5: fin -> 9223  

In [ ]: