Prerequisites

Ensure you are using a Python 3.5 kernel to run this notebook.

Setting up the streamsx.health module

For this course, you'll need the streamsx.health module. Run the cell below to install it:


In [ ]:
!pip install --upgrade "git+https://github.com/IBMStreams/streamsx.health#egg=streamsx_health.ingest&subdirectory=ingest/common/python/package"

Setting up Bluemix

Open the Bluemix web portal and log in or sign up for a free Bluemix account.

Ensure that your Streaming Analytics service is running in Bluemix.

If you don’t have a service, you can create one as follows:

  1. Click Catalog or Create Service, browse for Streaming Analytics and then click on it.
  2. Follow the instructions on the Streaming Analytics catalog page, type the Service name to set up your service.

  3. Click Create to open the Streaming Analytics service dashboard. Your service will start automatically.

Set up access to the service

You must provide the information that your streaming app needs to access the service. Run the cells below to provide your service name and credentials. If you are not prompted to enter the service credentials, click Kernel -> Restart on the menu bar and rerun the cells.


In [ ]:
service_name = input("Streaming Analytics service name:")

In [ ]:
import getpass
credentials=getpass.getpass('Streaming Analytics credentials:')

Tip: To copy your service credentials, open the Streaming Analytics service dashboard click Service Credentials, then View Credentials, and finally click the Copy icon and paste your service credentials when prompted.

Lab 1 - Create a simple Python application

Step 1 - Develop a basic Python application

In this exercise, you will develop a simple streaming application to process sample data.

Instructions

  1. Import the necessary modules from the streamsx package. You will need:
    • the Topology import, from streamsx.topology
    • everything in the streamsx.topology.context module

  2. Create a function, called 'Observations'
    • Create a string jsonStr and assign it the sample patient data shown in the section below
    • Import the json module, and load the JSON data into a dictionary called dictObj
    • Import the time module and slow your source by one second
    • Yield dictObj with each iteration through the above code
    • Finally, to simulate a continuous source, generate the data infinitely.

  3. Create a new topology. Then, create a new stream patientData from the data source you defined above.
  4. Print the data stream in the console.
  5. Submit the application over Bluemix.

Sample Data

{"patientId":"patient-1", "device":{"id":"VitalsGenerator", "locationId":"bed1"}, "readingSource":{"id":123, "deviceId":"VitalsGenerator", "sourceType":"generated"}, "reading": {"ts": 605, "uom":"bpm", "value":82.56785326532197, "readingType": {"code":"8867-4", "system":"streamsx.heath/1.0"}}}


In [ ]:
#Imports

#Set up access to Streaming Analytics service
vs={'streaming-analytics': [{'name': service_name, 'credentials': json.loads (credentials)}]}
cfg = {}
cfg[ConfigParams.VCAP_SERVICES] = vs
cfg[ConfigParams.SERVICE_NAME] = service_name

# Define data source
        
# Create Topology and read from data source 

# Print

# Submit on Bluemix

Lab 2 - Handle a diversity of patient data

Step 1 - Filter data using lambda function

In this exercise, you will modify the application to only handle heart rate from the patient readings.

  1. Filter out all the readings whose code value is not '8867-4'. Assign the name heartRate to the filtered stream.
  2. Fix the sink() function to sink the heartRate stream instead.
  3. Submit the application over Bluemix and view the output on your Streams Console.

Step 2 - Submit the patient simulator job

  1. On the Streams Console, click the 'Submit Job' button
  2. Leave the instance field as its default value
  3. Select 'Specify the URL of the application bundle' and enter the following URL: https://github.com/IBMStreams/streamsx.health/releases/download/v0.1/com.ibm.streamsx.health.simulate.beacon.services.HealthDataBeaconService.sab
  4. Click 'Submit.'

Step 3 - Subscribe to the simulator

  1. Import the schema module from streamsx.topology.
  2. Replace your Observations source with a Subscribe call to the patient simulator. The topic to subscribe to is 'ingest-beacon'.
  3. Make sure the simulator is running before you submit any subsequent applications.
  4. Submit your application over Bluemix and view the output.

In [ ]:

Lab 3 - Anonymize and average data

Step 1 - Anonymize Patient Data

In this exercise, you are going to 'anonymize' patient data by hashing identifying information.

  1. Import the hashlib module
  2. Make a function that...
    • Has a single parameter
    • Hashes patientId and locationId using the sha256() algorithm
      You can use: ITEM_TO_ENCODE = hashlib.sha256(ITEM_TO_ENCODE.encode('utf-8')).digest()
    • Returns the modified tuple
  3. Modify the content of each tuple on the patientData stream by calling the anonymize method. Name the new stream patientX. This step should precede the filter.
  4. Submit over Bluemix and view the output.

Step 2 - Keeping States

In this exercise, you will keep state of the last 10 tuples from the patient data stream. For each new tuple that comes in, you will calculate the moving average from the last 10 tuples.

  1. Import the getReadingValue function from the streamsx_health.ingest.Observation module.
  2. Create a new callable class. The class should have a field called last_n, which is a list that keeps track of the last n tuples.
    • The __call__ method of the class should take a single parameter, n, which is the number of tuples over which to calculate the average
    • When the __call__ method is called, append the tuple's heart rate reading to the list. If the length of the list > n, pop the oldest tuple.
    • Return the average of all values from the list

  3. Calculate the moving average of heart rate, over the last 10 tuples, calling the new stream avgHr
  4. Submit the application over Bluemix, and view the results - a simple list of average heart rate readings.

In [ ]:

Lab 4 - Visualize data in Python notebook

Step 1 - Viewing Data

  1. Before the print statement, create a view from the Average Heart Rate stream.
  2. Launch the application and proceed to the cells below.

In [ ]:

Step 2a - Fetch View Data

The following code shows how to fetch view data from a view that we have set up in a Streams application.

  1. Import deque from the collections module.
  2. Create a deque called plotQueue that holds up to 2000 tuples.
  3. Start fetching data for your view.
  4. Create a data_collector function that iterates through your view data and appends each value to plotQueue.
  5. Run data_collector in a background thread and save data in plotQueue - plotQueue will be used to visualize data in the next cell.

In [ ]:
#Imports

# Create a buffer of 2000 tuples for plotting

#Fetch view data

#Define data collector function

#Start a background thread and save view data in deque

Step 2b - Visualize View Data using Matplotlib

The following cell shows how to view realtime data in Jupyter Notebook. The code updates the view once every second.
Data is plotted from the plotQueue variable.

As long as the data view and the view is running, you can start and stop the following cell when you work on your analytics and visualizations.


In [ ]:
# Visualize view data in a line graph
%matplotlib inline
import time
from IPython import display
import pylab as pl

pl.rcParams['figure.figsize'] = (14.0, 8.0)

while (True):
    pl.clf()
    ax = pl.gca()
    ax.set_autoscale_on(False)
    ax.plot(plotQueue)
    ax.axis([0, 2000, 50, 120])
    display.display(pl.gcf())
    print(len(plotQueue))
    display.clear_output(wait=True)
    time.sleep(1.0)

In [ ]: