Healthcare Python Streaming Application Demo

This Python notebooks shows how you can create an app that ingests and analyzes streaming data from a feed, and then visualizes the data in the notebook. You submit this app to be built in the Streaming Analyics service instance in Bluemix.

The app is a Python Topology app written with the Streams Python API. The app subscribes to the topic containing the patient data, performs analysis on the waveforms and creates a view to access the results of the analysis.

Submitting the Python application from the notebook allows data to be retrieved from the view. Once the data has been retrieved, it can be analyzed, manipulated or visualized like any other data accessed from a notebook. In the case of this demo, waveform graphs and numerical widgets display the healthcare data of the patient.

Familiarity with Python is recommended. This notebook runs on Python 3.5.

The following diagram outlines the architecture of the demo.

Part 1: Setup

Complete the following steps to set up your DSX environment.

1.1 Install the Python modules

Run the following cell to install the latest software package.


In [ ]:
!pip install --user --upgrade streamsx
!pip install --user --upgrade "git+https://github.com/IBMStreams/streamsx.health.git#egg=healthdemo&subdirectory=samples/HealthcareJupyterDemo/package"

1.2 Start a Streaming Analytics service instance

If you have a Streaming Analytics instance in IBM Bluemix, make sure that it's up and running.

If you don't have a Streaming Analytics instance, create one with these steps:

  1. Go to the Bluemix web portal and log in (or sign up for a free Bluemix account).
  2. Click Catalog, browse for the Streaming Analytics service and then click on it.
  3. Follow the instructions on the Streaming Analytics catalog page to set up your service instance, and then click Create. The service dashboard opens and your service instance starts automatically.

1.3 Set up access to the service instance

You must provide the information that your streaming app needs to access the service instance.

Run the following cell, and then enter your service credential.

Tip: To copy and paste your service credentials, go to your service dashboard, click Service Credentials and then View Credentials


In [ ]:
import json
import getpass
from streamsx.topology.context import ConfigParams
credentials=getpass.getpass('Streaming Analytics credentials:')

vs={'streaming-analytics': [{'name': 'Streaming Analytics', 'credentials': json.loads(credentials)}]}
config={ConfigParams.VCAP_SERVICES: vs, ConfigParams.SERVICE_NAME: 'Streaming Analytics'}

Part 2: Create a data feed

We'll use a pre-compiled IBM Streams application to simulate patient data, and publish the data to a topic. A service is a small IBM Streams application. The PhysionetIngestServiceMulti microservice retrieves patient waveform and vital data from a Physionet database (https://www.physionet.org/) using three different sets of data. The patient data is submitted to the ingest-physionet topic so that the data feed can be consumed by the apps you're going to build later in this notebook.

2.1 Submit the precompiled Physionet microservice

  1. Download the PhysionetIngestServiceMulti.sab file to your local machine from https://github.com/IBMStreams/streamsx.health/releases.
  2. Go to the Streaming Analytics service instance and submit the SAB file by using the Streams application development console.
  3. When prompted, set the number of patients to 3 or more.

Part 3: Build an streaming app

Now you're ready to create the HealthcareDemo Python streaming application, and submit it to the Streaming Analytics service instance, where the app will be remotely built and deployed.

3.1 Healthcare Patient Python Topology Application

This cell contains source code for the Python Topology application. This is a Streaming Python application that ingests the patient data from the ingest-physionet topic, and performs analysis on the patient data to calculate vital data for all patients and then creates a view for viewing the result of the analysis.


In [ ]:
from streamsx.topology import schema
from streamsx.topology.topology import Topology
from streamsx.topology.context import submit

## The healthdemo package provides tools to analyse patient data
## See https://github.com/IBMStreams/streamsx.health/tree/develop/samples/HealthcareJupyterDemo/package
from healthdemo.patientmonitoring_functions import streaming_rpeak
from healthdemo.healthcare_functions import GenTimestamp, aggregate
from healthdemo.windows import SlidingWindow

topo = Topology('HealthcareDemo')

## The ingest-physionet provides data at a rate of 125 tuples/sec
sample_rate = 125

## Subscribe to the topic
patients_data = topo.subscribe('ingest-physionet', schema.CommonSchema.Json)
            
## Add timestamp to the data, so we can perform windowing
patients_data = patients_data.transform(GenTimestamp(sample_rate))

## Generate a window based on the provided sample_rate
patients_data = patients_data.transform(SlidingWindow(length=sample_rate, trigger=sample_rate-1))

## Aggregate the data within the window and create a tuple
patients_data = patients_data.transform(aggregate)

## Process data from 'ECG Lead II' and calculate RPeak and RR delta
patients_data = streaming_rpeak(patients_data, sample_rate, data_label='ECG Lead II')

## Create view for viewing patient vital data
patients_vital = patients_data.view(name='patients_vital')

rc = submit('ANALYTICS_SERVICE', topo, config)
print ("DONE")

Part 4: Visualization

4.1 Setup graphs for plotting patient vitals

This cell initializes the nine graphs which will be used to display one patient's vital data.

Each property of the patient's vital data is identified by the signal label. Each graph is initialized by providing the signal label it plots and a title.


In [ ]:
from healthdemo.medgraphs import ECGGraph, PoincareGraph, NumericText, ABPNumericText

## Select which patient's data to plot
patientId = 'patient-1'

graph = {
    'leadII_poincare': PoincareGraph(signal_label='Poincare - ECG Lead II', title='Poincare - ECG Lead II'),
    'ecg_leadII_graph': ECGGraph(signal_label='ECG Lead II', title='ECG Lead II', 
                                 plot_width=600, min_range=-0.5, max_range=2.0),
    'ecg_leadV_graph': ECGGraph(signal_label='ECG Lead V', title='ECG Lead V', plot_width=600),
    'resp_graph': ECGGraph(signal_label='Resp', title='Resp', min_range=-1, max_range=3, plot_width=600),
    'pleth_graph': ECGGraph(signal_label='Pleth', title='Pleth', min_range=0, max_range=5, plot_width=600),
    'hr_numeric': NumericText(signal_label='HR', title='HR', color='#7cc7ff'),
    'pulse_numeric': NumericText(signal_label='PULSE', title='PULSE', color='#e71d32'),
    'spo2_numeric': NumericText(signal_label='SpO2', title='SpO2', color='#8cd211'),
    'abp_numeric': ABPNumericText(abp_sys_label='ABP Systolic', abp_dia_label='ABP Diastolic', 
                                  title='ABP', color='#fdd600')            
}

4.2 Provide data for the graphs

This cell is responsible for propagating the graph objects with data in the view.

The view data contains vital data for all patients, and is continuously retrieved from the Streaming Analytics service in a background job. Each graph object receives data for a specified patient. The graph objects extracts and stores the data that is relevant for that particular graph.


In [ ]:
from healthdemo.utils import get_patient_id

## load BokehJS visualization library (must be loaded in a separate cell)
from bokeh.io import output_notebook, push_notebook
from bokeh.resources import INLINE
output_notebook(resources=INLINE)
%autosave 0
%reload_ext autoreload
%autoreload 1

continue_data_collection = True

## retrieve data from Streams view in a background job
def data_collector(view, g):
    queue = view.start_data_fetch()
    while continue_data_collection:
        tup = queue.get()
        if patientId == get_patient_id(tup):
            for graphtype in g:
                g[graphtype].add(tup)
    view.stop_data_fetch()
            
from IPython.lib import backgroundjobs as bg
jobs = bg.BackgroundJobManager()
jobs.new(data_collector, patients_vital, graph)

4.3 Display the graphs

This cell is responsible for laying out and displaying the graphs.

Each time a call to update() is made on a graph object, the next data point is retrieved and displayed. Each graph object maintains an internal queue so that each time a call to update() is made, the next element in the queue is retrieved and removed.

There is a loop that continuously calls the update() method on each of the graphs for 60 seconds. After each graph has been updated, a call to push_notebook() is made, which causes the notebook to update the graphics.


In [ ]:
import time
from bokeh.io import show
from bokeh.layouts import column, row, widgetbox

## display graphs for a patient
show(
    row(
        column(
            graph['ecg_leadII_graph'].get_figure(), 
            graph['ecg_leadV_graph'].get_figure(), 
            graph['resp_graph'].get_figure(),
            graph['pleth_graph'].get_figure()
        ), 
        column(
            graph['leadII_poincare'].get_figure(),
            widgetbox(graph['hr_numeric'].get_figure()),
            widgetbox(graph['pulse_numeric'].get_figure()),
            widgetbox(graph['spo2_numeric'].get_figure()),
            widgetbox(graph['abp_numeric'].get_figure())
        )
    ),
    
    # If using bokeh > 0.12.2, uncomment the following statement
    #notebook_handle=True
)

## Timeout(in seconds) before stopping the graph
timeout = 60
endtime = time.time() + timeout

cnt = 0
while time.time() < endtime:
    ## update graphs
    for graphtype in graph:
        graph[graphtype].update()

    ## update notebook 
    cnt += 1
    if cnt % 5 == 0:
        push_notebook() ## refresh the graphs
        cnt = 0
    time.sleep(0.008)
    
# Stop data collection running in background thread
continue_data_collection = False

To plot the graph for a different patient, change patientId in Part 4: Visualization, and rerun the cells

Part 5: Summary and next steps

You learned how to create an app with Python that ingests and analyzes streaming data from a feed, and then visualizes the data in the notebook. You submited this app to be built in the Streaming Analyics service instance in Bluemix.

Check out other notebooks in this series:

  • Build and use a data model in real time with the Python API. (need link when published)
  • TBD. (need link when published)

Dig deeper:

Author

James Cancilla is a software developer who specializes in streaming technology and cloud solutions.
Kendrick Wong is a software developer who specializes in streaming technology and cloud solutions.

Copyright © 2017 IBM. This notebook and its source code are released under the terms of the MIT License.


In [ ]: