In [ ]:
!pip install --upgrade "git+https://github.com/IBMStreams/streamsx.health#egg=streamsx_health.ingest&subdirectory=ingest/common/python/package"
Open the Bluemix web portal and log in or sign up for a free Bluemix account.
Ensure that your Streaming Analytics service is running in Bluemix.
If you don’t have a service, you can create one as follows:
Streaming Analytics
and then click on it. You must provide the information that your streaming app needs to access the service. Run the cells below to provide your service name and credentials. If you are not prompted to enter the service credentials, click Kernel -> Restart on the menu bar and rerun the cells.
In [ ]:
service_name = input("Streaming Analytics service name:")
In [ ]:
import getpass
credentials=getpass.getpass('Streaming Analytics credentials:')
Tip:
To copy your service credentials, open the Streaming Analytics service dashboard click Service Credentials, then View Credentials, and finally click the Copy icon and paste your service credentials when prompted.
In this exercise, you will develop a simple streaming application to process sample data.
Topology
import, from streamsx.topology
streamsx.topology.context
module jsonStr
and assign it the sample patient data shown in the section belowjson
module, and load the JSON data into a dictionary called dictObj
time
module and slow your source by one seconddictObj
with each iteration through the above codepatientData
from the data source you defined above.{"patientId":"patient-1", "device":{"id":"VitalsGenerator", "locationId":"bed1"}, "readingSource":{"id":123, "deviceId":"VitalsGenerator", "sourceType":"generated"}, "reading": {"ts": 605, "uom":"bpm", "value":82.56785326532197, "readingType": {"code":"8867-4", "system":"streamsx.heath/1.0"}}}
In [ ]:
#Imports
#Set up access to Streaming Analytics service
vs={'streaming-analytics': [{'name': service_name, 'credentials': json.loads (credentials)}]}
cfg = {}
cfg[ConfigParams.VCAP_SERVICES] = vs
cfg[ConfigParams.SERVICE_NAME] = service_name
# Define data source
# Create Topology and read from data source
# Print
# Submit on Bluemix
In this exercise, you will modify the application to only handle heart rate from the patient readings.
code
value is not '8867-4'. Assign the name heartRate
to the filtered stream. sink()
function to sink the heartRate
stream instead.schema
module from streamsx.topology
.Observations
source with a Subscribe call to the patient simulator. The topic to subscribe to is 'ingest-beacon'
.
In [ ]:
In this exercise, you are going to 'anonymize' patient data by hashing identifying information.
hashlib
modulepatientId
and locationId
using the sha256() algorithmITEM_TO_ENCODE = hashlib.sha256(ITEM_TO_ENCODE.encode('utf-8')).digest()
patientData
stream by calling the anonymize method. Name the new stream patientX.
This step should precede the filter.In this exercise, you will keep state of the last 10 tuples from the patient data stream. For each new tuple that comes in, you will calculate the moving average from the last 10 tuples.
getReadingValue
function from the streamsx_health.ingest.Observation
module.last_n
, which is a list that keeps track of the last n tuples.n
, which is the number of tuples over which to calculate the averagen
, pop the oldest tuple.avgHr
In [ ]:
In [ ]:
The following code shows how to fetch view data from a view that we have set up in a Streams application.
deque
from the collections module. plotQueue
that holds up to 2000 tuples.data_collector
function that iterates through your view data and appends each value to plotQueue
.data_collector
in a background thread and save data in plotQueue
- plotQueue
will be used to visualize data in the next cell.
In [ ]:
#Imports
# Create a buffer of 2000 tuples for plotting
#Fetch view data
#Define data collector function
#Start a background thread and save view data in deque
The following cell shows how to view realtime data in Jupyter Notebook.
The code updates the view once every second.
Data is plotted from the plotQueue
variable.
As long as the data view and the view is running, you can start and stop the following cell when you work on your analytics and visualizations.
In [ ]:
# Visualize view data in a line graph
%matplotlib inline
import time
from IPython import display
import pylab as pl
pl.rcParams['figure.figsize'] = (14.0, 8.0)
while (True):
pl.clf()
ax = pl.gca()
ax.set_autoscale_on(False)
ax.plot(plotQueue)
ax.axis([0, 2000, 50, 120])
display.display(pl.gcf())
print(len(plotQueue))
display.clear_output(wait=True)
time.sleep(1.0)
In [ ]: