In [1]:
# Example: from_marbles
import rx
try:
rx.Observable.from_marbles('a-b|')
except Exception as ex:
print 'error:', ex # shown only after ipython notebook kernel restart
In [2]:
# -> to see whats there don't use e.g. `dir(Observable)` but find
# 'def from_marbles' in the rx directory, to see the module,
# then import it:
'''
~/GitHub/RxPY/rx $ ack 'def from_marbl'
testing/marbles.py
90:def from_marbles(self, scheduler=None):
'''
import rx.testing.marbles
def show(x): print (x)
stream = rx.Observable.from_marbles('a-b--c|').to_blocking().subscribe(show)
It is useful to understand on a high level how RxPY handles asyncronity and when.
E.g. naively you might want to know, when notifying a value to a subscriber, what other subscribers are present.
This makes no sense to ask (I think in general in reactive programming) and it will be clear looking at an example.
Consider timing and thread outputs in the following:
In [38]:
# =============================
# change these (both in millis)
delay_stream, slow_emit = 0, 0
# =============================
import rx, threading, random, time
thread = threading.currentThread
def call_observer(obs):
'''observer functions are invoked, blocking'''
print_out(obs.__class__.__name__, hash(obs))
for i in range(2):
obs.on_next(1)
if slow_emit:
time.sleep(slow_emit/1000)
obs.on_next(1)
stream = rx.Observable.create(call_observer).take(10)
if delay_stream:
stream = stream.delay(delay_stream)
def print_out(*v):
'''printout of current time, v, and current thread'''
v_pretty = ' '.join([str(s) for s in v])
print ('%.8f - %30s - %s\n' % (time.time(), v_pretty, thread().getName()))
d = stream.subscribe(lambda x: print_out('Observer 1', x))
d = stream.subscribe(lambda x: print_out('Observer 2', x))
=> In the
call_observer
function you can't know about the concurrency situation It soleley depends on the design of the stream operations applied. See.ref_count()
though, for published streams
Check the .observe_on
example to get a deeper understanding how scheduling works.
In [ ]: