MultiGraph

Create 3 views:

  • A view on a randomly generated stock price
  • A view on a moving average of the last 50 stock prices
  • A view on a moving average of the last 150 stock prices

In [1]:
from streamsx.topology.topology import Topology
from streamsx.topology import context
from some_module import jsonRandomWalk, movingAverage
#from streamsx import rest
import json

# Define operators
rw = jsonRandomWalk()
ma_150 = movingAverage(150)
ma_50 = movingAverage(50)

# Define topology & submit
top = Topology("myTop")
ticker_price = top.source(rw)
ma_150_stream = ticker_price.map(ma_150)
ma_50_stream = ticker_price.map(ma_50)

Code the user can supply to view the streaming data

Given the streams with the 3 different moving averages, create 3 separate views to obtain the data.


In [2]:
ticker_view = ticker_price.view()
ma_150_view = ma_150_stream.view()
ma_50_view = ma_50_stream.view()

Submit To Distributed Streams Install


In [3]:
context.submit("DISTRIBUTED", top.graph, username = "streamsadmin", password = "passw0rd")


2016-11-17 02:07:39,455 - streamsx.topology.py_submit - INFO - Generating SPL and submitting application.
Nov 17, 2016 2:07:42 AM com.ibm.streamsx.topology.internal.streams.InvokeMakeToolkit invoke
INFO: Invoking spl-make-toolkit
Nov 17, 2016 2:07:42 AM com.ibm.streamsx.topology.internal.streams.InvokeMakeToolkit invoke
INFO: /opt/ibm/InfoSphere_Streams/4.2.0.0/bin/spl-make-toolkit --make-operator -i /home/streamsadmin/git/streamsx.topology/samples/python/topology/notebooks/MultiGraph/tk7034316274430990980
Nov 17, 2016 2:07:44 AM com.ibm.streamsx.topology.internal.streams.InvokeMakeToolkit invoke
INFO: spl-make-toolkit complete: return code=0
Nov 17, 2016 2:07:44 AM com.ibm.streamsx.topology.internal.streams.InvokeSc getToolkitPath
INFO: ToolkitPath:/home/streamsadmin/git/streamsx.topology/com.ibm.streamsx.topology:/opt/ibm/InfoSphere_Streams/4.2.0.0/toolkits
Nov 17, 2016 2:07:44 AM com.ibm.streamsx.topology.internal.streams.InvokeSc invoke
INFO: Invoking SPL compiler (sc) for main composite: myTop::myTop
Nov 17, 2016 2:07:44 AM com.ibm.streamsx.topology.internal.streams.InvokeSc invoke
INFO: /opt/ibm/InfoSphere_Streams/4.2.0.0/bin/sc --rebuild-toolkits --optimized-code-generation --num-make-threads=4 -M myTop::myTop -t /home/streamsadmin/git/streamsx.topology/com.ibm.streamsx.topology:/opt/ibm/InfoSphere_Streams/4.2.0.0/toolkits
Nov 17, 2016 2:07:45 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
SEVERE: CDISP0005I The toolkit.xml file in the /home/streamsadmin/git/streamsx.topology/com.ibm.streamsx.topology directory is out of date. The compiler will rebuild the toolkit by using the spl-make-toolkit command.
Nov 17, 2016 2:07:50 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO: Checking the constraints.
Nov 17, 2016 2:07:50 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO: Creating the types.
Nov 17, 2016 2:07:50 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO: Creating the functions.
Nov 17, 2016 2:07:50 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO: Creating the operators.
Nov 17, 2016 2:07:51 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO: Creating the standalone application.
Nov 17, 2016 2:07:51 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO: Creating the application model.
Nov 17, 2016 2:07:51 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO: Building the binaries.
Nov 17, 2016 2:07:51 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO:  [CXX-type] tuple<rstring jsonString>
Nov 17, 2016 2:07:51 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO:  [CXX-type] tuple<blob __spl_po>
Nov 17, 2016 2:07:51 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO:  [CXX-operator] jsonRandomWalk_0
Nov 17, 2016 2:07:51 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO:  [CXX-operator] identity_3
Nov 17, 2016 2:07:58 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO:  [CXX-operator] movingAverage_1
Nov 17, 2016 2:07:58 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO:  [CXX-standalone] standalone
Nov 17, 2016 2:07:58 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO:  [LD-standalone] standalone
Nov 17, 2016 2:08:00 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO:  [LN-standalone] standalone
Nov 17, 2016 2:08:02 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO:  [LD-so] so jsonRandomWalk_0
Nov 17, 2016 2:08:02 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO:  [LD-so] so identity_3
Nov 17, 2016 2:08:03 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO:  [LD-so] so movingAverage_1
Nov 17, 2016 2:08:04 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO:  [Bundle] myTop.myTop.sab
Nov 17, 2016 2:08:09 AM com.ibm.streamsx.topology.internal.streams.InvokeSc invoke
INFO: SPL compiler complete: return code=0
Nov 17, 2016 2:08:09 AM com.ibm.streamsx.topology.internal.context.BundleStreamsContext doSPLCompile
INFO: Streams Application Bundle produced: myTop.myTop.sab
Nov 17, 2016 2:08:09 AM com.ibm.streamsx.topology.internal.streams.InvokeSubmit fileJobConfig
INFO: JobConfig: {"jobConfigOverlays":[{"deploymentConfig":{"fusionScheme":"legacy"}}]}
Nov 17, 2016 2:08:09 AM com.ibm.streamsx.topology.internal.streams.InvokeSubmit invoke
INFO: Invoking streamtool submitjob /home/streamsadmin/git/streamsx.topology/samples/python/topology/notebooks/MultiGraph/myTop.myTop.sab
Nov 17, 2016 2:08:09 AM com.ibm.streamsx.topology.internal.streams.InvokeSubmit invoke
INFO: /opt/ibm/InfoSphere_Streams/4.2.0.0/bin/streamtool submitjob --outfile /tmp/streamsjobid1635887254317458064txt --jobConfig /tmp/streamsjco4076417921980903738.json /home/streamsadmin/git/streamsx.topology/samples/python/topology/notebooks/MultiGraph/myTop.myTop.sab
Nov 17, 2016 2:08:15 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO: CDISC0079I The following number of applications were submitted to the StreamsInstance instance: 1. The instance is in the StreamsDomain domain.
Nov 17, 2016 2:08:17 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO: CDISC0080I The 71 job was submitted to the StreamsInstance instance in the StreamsDomain domain.
Nov 17, 2016 2:08:17 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO: CDISC0020I Submitted job IDs: 71
Nov 17, 2016 2:08:17 AM com.ibm.streamsx.topology.internal.process.ProcessOutputToLogger run
INFO: CDISC0081I The job IDs that were submitted were written to the following output file: /tmp/streamsjobid1635887254317458064txt.
Nov 17, 2016 2:08:21 AM com.ibm.streamsx.topology.internal.streams.InvokeSubmit invoke
INFO: streamtool submitjob complete: return code=0
Nov 17, 2016 2:08:22 AM com.ibm.streamsx.topology.internal.streams.InvokeSubmit invoke
INFO: Bundle: myTop.myTop.sab submitted with jobid: 71

Graph The Stock Price & Moving Averages


In [5]:
%matplotlib inline
%matplotlib notebook

from streamsx.rest import multi_graph_every

l = [ticker_view, ma_150_view, ma_50_view]
multi_graph_every(l, 'val', 1.0)


---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-5-e84dcc610379> in <module>()
      5 
      6 l = [ticker_view, ma_150_view, ma_50_view]
----> 7 multi_graph_every(l, 'val', 1.0)

/home/streamsadmin/git/streamsx.topology/com.ibm.streamsx.topology/opt/python/packages/streamsx/rest.py in multi_graph_every(views, key, time_step)
    593             ax.set_ylim(np.amin(ydatas[0]) - 1.0, np.amax(ydatas[0]) + 1.0)
    594             count += 1
--> 595         fig.canvas.draw()
    596         ydatas = []
    597 

/home/streamsadmin/anaconda3/lib/python3.5/site-packages/matplotlib/backends/backend_webagg_core.py in draw(self)
    177         backend_agg.RendererAgg.lock.acquire()
    178         try:
--> 179             self.figure.draw(renderer)
    180         finally:
    181             backend_agg.RendererAgg.lock.release()

/home/streamsadmin/anaconda3/lib/python3.5/site-packages/matplotlib/artist.py in draw_wrapper(artist, renderer, *args, **kwargs)
     60     def draw_wrapper(artist, renderer, *args, **kwargs):
     61         before(artist, renderer)
---> 62         draw(artist, renderer, *args, **kwargs)
     63         after(artist, renderer)
     64 

/home/streamsadmin/anaconda3/lib/python3.5/site-packages/matplotlib/figure.py in draw(self, renderer)
   1157         dsu.sort(key=itemgetter(0))
   1158         for zorder, a, func, args in dsu:
-> 1159             func(*args)
   1160 
   1161         renderer.close_group('figure')

/home/streamsadmin/anaconda3/lib/python3.5/site-packages/matplotlib/artist.py in draw_wrapper(artist, renderer, *args, **kwargs)
     60     def draw_wrapper(artist, renderer, *args, **kwargs):
     61         before(artist, renderer)
---> 62         draw(artist, renderer, *args, **kwargs)
     63         after(artist, renderer)
     64 

/home/streamsadmin/anaconda3/lib/python3.5/site-packages/matplotlib/axes/_base.py in draw(self, renderer, inframe)
   2317 
   2318         for zorder, a in dsu:
-> 2319             a.draw(renderer)
   2320 
   2321         renderer.close_group('axes')

/home/streamsadmin/anaconda3/lib/python3.5/site-packages/matplotlib/artist.py in draw_wrapper(artist, renderer, *args, **kwargs)
     60     def draw_wrapper(artist, renderer, *args, **kwargs):
     61         before(artist, renderer)
---> 62         draw(artist, renderer, *args, **kwargs)
     63         after(artist, renderer)
     64 

KeyboardInterrupt: 

In [ ]: