Healthcare Python Streaming Application Demo

This application demonstrates how users can develop Python Streaming Applications on DSX. The DSX Notebook ultimately submits two Streams applications to a local Streams cluster. The first application is a pre-compiled SPL application that simulates patient waveform and vital data, as well as publishes the data to a topic. The second application is a Python Topology application written using the Streams Python API. This application subscribes to the topic containing the patient data, performs analysis on the waveforms and sends all of the data, including the results of the analysis, to the Streams view server.

Submitting the Python application from the Notebook allows for connecting to the Streams view server in order to retrieve the data. Once the data has been retrieved, it can be analyzed, manipulated or visualized like any other data accessed from a notebook. In the case of this demo, waveform graphs and numerical widgets are being used to display the healthcare data of the patient.

The following diagram outlines the architecture of the demo.

Cell Description

This cell is responsible for installing python modules required for running this notebook


In [ ]:
!pip install --user --upgrade streamsx
!pip install --user --upgrade "git+https://github.com/IBMStreams/streamsx.health.git#egg=healthdemo&subdirectory=samples/HealthcareJupyterDemo/package"

Cell Description

This cell is responsible for building and submitting the Streams applications to the Streams cluster.

PhysionetIngestServiceMulti microservice

This microservice comes in the form of a pre-compiled SAB file. The microservice retrieves patient waveform and vital data from a Physionet database (https://www.physionet.org/). 3 different sets of data are used as source. The patient data is submitted to the ingest-physionet topic, which allows it to be consumed from downtstream applications or services.


In [ ]:
from streamsx.topology.topology import Topology, schema
from streamsx.topology.context import ConfigParams, submit
from streamsx.topology import functions

print ('Submitting to streaming analytic service.')

# 'vcap_services.json' is created by running the "Create VCAP Service Credential" notebook
# Change the service name accordingly
config = {
    ConfigParams.VCAP_SERVICES: 'vcap_services.json',
    ConfigParams.SERVICE_NAME: 'Streaming-Analytics'
}

numPatients = 3
print ("Don't forget to submit patient ingest microservice manually. (Set num.patients >= 3)")

Cell Description

This cell is responsible for building and submitting the Streams applications to the Streams cluster.

Healthcare Patient Alert Python Topology Application

This cell contains source code for the Python Topology application. This is a Streaming Python application that ingests the patient data from the ingest-physionet topic, performs filtering on patients that require attention, and then sends the data to the Streams view server.


In [ ]:
from healthdemo.utils import get_patient_id, get_sampled_data_values

topo = Topology('HealthcareDemo_PatientAlert')

def patientNeedsAttention(tup):
    pulse = get_sampled_data_values(tup, 'PULSE')[0]
    return pulse > 0 and pulse < 80

## Create a view that shows patient that requires attention
patients_requiring_attention = topo.subscribe('ingest-physionet', schema.CommonSchema.Json) \
    .map(functions.identity) \
    .filter(patientNeedsAttention)

view = patients_requiring_attention.view()

submit('ANALYTICS_SERVICE', topo, config)
print ("DONE")

Cell Description

This cell creates the background job that access the view data.

The view data is continuously retrieved from the Streams view server in a background job. Each graph object represent a patient and receives data when the patient requires attention.


In [ ]:
from healthdemo.medgraphs import NumericText
import re

## load BokehJS visualization library (must be loaded in a separate cell)
from bokeh.io import output_notebook, push_notebook
from bokeh.resources import INLINE
output_notebook(resources=INLINE)
%autosave 0
%reload_ext autoreload
%aimport healthdemo.utils
%aimport healthdemo.medgraphs
%autoreload 1

## create the graphs ##
graphs = []

alert_div = '''
<div style="background-color: #152935; border: 3px solid black; height: 120px; width: 700px; font-family: arial; clear: both; color: {0}">
  <div style="margin-left: 5px; margin-top: 20px; font-size: 30px; float:left">{1} requires Attention!</div>
  <div style="margin-right: 5px; margin-top: 20px; font-size: 30px; float:right">pulse: {2} {3}</div>
</div>
'''

for id in range(numPatients):
    pulse_numeric = NumericText(signal_label='PULSE', title='Patient-%d' % (id+1), color='#e71d32', override_str=alert_div)
    graphs.append(pulse_numeric)

## retrieve data from Streams view in a background job ##
def data_collector(view, graphs):
    for d in iter(view.get, None):
        patientId = int(re.findall('\d+', get_patient_id(d))[0])
        if patientId < numPatients:
            graphs[patientId-1].add(d)

from IPython.lib import backgroundjobs as bg
jobs = bg.BackgroundJobManager()
jobs.new(data_collector, view.start_data_fetch(), graphs)

Cell Description

This cell is responsible for laying out and displaying the graphs. There is an infinite loop that continuously calls the update() method on each of the graphs. After each graph has been updated, a call to push_notebook() is made, which causes the notebook to update the graphics.


In [ ]:
import time
from bokeh.io import show
from bokeh.layouts import column, widgetbox

plots = []

for g in graphs:
    plots.append(widgetbox(g.get_figure()))
    
## display graphs
print ("Monitoring patients' pulses...")
show(column(plots),
     # If using bokeh > 0.12.2, uncomment the following statement
     #notebook_handle=True
)

cnt = 0
while True:
    ## update graphs
    for g in graphs:
        g.update()

    ## update notebook
    cnt += 1
    if cnt % 125 == 0:
        push_notebook() ## refresh the graphs
        cnt = 0
    time.sleep(0.008)

Cell Description

This cell is responsible for building and submitting the Streams applications to the Streams cluster.

Healthcare Patient Python Topology Application

This cell contains source code for the Python Topology application. As described in the above architecture, this is a Streaming Python application that ingests the patient data from the ingest-physionet topic, performs filtering and analysis on the data, and then sends the data to the Streams view server.


In [ ]:
from healthdemo.patientmonitoring_functions import streaming_rpeak
from healthdemo.healthcare_functions import PatientFilter, GenTimestamp, aggregate
from healthdemo.windows import SlidingWindow

def getPatientView(patient_id):
    '''
    Select data of given patient_id, perform analysis and return view.
    
    Parameters
    ----------
    patient_id: int
      patient_id (1-based)
      
    Returns
    -------
    view: topology.View
      view data from Streams server
    '''
    topo = Topology('HealthcareDemo_Patient%d' % (patient_id))

    ## Ingest, preprocess and aggregate patient data
    sample_rate = 125
    patient_data = topo.subscribe('ingest-physionet', schema.CommonSchema.Json) \
                       .map(functions.identity) \
                       .filter(PatientFilter('patient-%d' % (patient_id))) \
                       .transform(GenTimestamp(sample_rate)) \
                       .transform(SlidingWindow(length=sample_rate, trigger=sample_rate-1)) \
                       .transform(aggregate)
                        
    ## Calculate RPeak and RR delta
    patient_data = streaming_rpeak(patient_data, sample_rate, data_label='ECG Lead II')

    ## Create a view of the data
    patient_view = patient_data.view()
    
    submit('ANALYTICS_SERVICE', topo, config)
    return patient_view
    
# Retrieve view for a patient    
patient_view = getPatientView(2)
    
print('DONE')

Cell Description

This cell initializes all of the graphs that will be used as well as creates the background job that access the view data.

The view data is continuously retrieved from the Streams view server in a background job. Each graph object receives a copy of the data. The graph objects extracts and stores the data that is relevant for that particular graph. Each time a call to update() is made on a graph object, the next data point is retrieved and displayed. Each graph object maintains an internal queue so that each time a call to update() is made, the next element in the queue is retrieved and removed.


In [ ]:
from healthdemo.medgraphs import ECGGraph, PoincareGraph, NumericText, ABPNumericText

## load BokehJS visualization library (must be loaded in a separate cell)
from bokeh.io import output_notebook, push_notebook
from bokeh.resources import INLINE
output_notebook(resources=INLINE)
%autosave 0
%reload_ext autoreload
%aimport healthdemo.utils
%aimport healthdemo.medgraphs
%autoreload 1

## create the graphs ##
graphs = []

ecg_leadII_graph = ECGGraph(signal_label='ECG Lead II', title='ECG Lead II', plot_width=600, min_range=-0.5, max_range=2.0)
graphs.append(ecg_leadII_graph)

leadII_poincare = PoincareGraph(signal_label='Poincare - ECG Lead II', title='Poincare - ECG Lead II')
graphs.append(leadII_poincare)

ecg_leadV_graph = ECGGraph(signal_label='ECG Lead V', title='ECG Lead V', plot_width=600)
graphs.append(ecg_leadV_graph)

resp_graph = ECGGraph(signal_label='Resp', title='Resp', min_range=-1, max_range=3, plot_width=600)
graphs.append(resp_graph)

pleth_graph = ECGGraph(signal_label='Pleth', title='Pleth', min_range=0, max_range=5, plot_width=600)
graphs.append(pleth_graph)

hr_numeric = NumericText(signal_label='HR', title='HR', color='#7cc7ff')
graphs.append(hr_numeric)

pulse_numeric = NumericText(signal_label='PULSE', title='PULSE', color='#e71d32')
graphs.append(pulse_numeric)

spo2_numeric = NumericText(signal_label='SpO2', title='SpO2', color='#8cd211')
graphs.append(spo2_numeric)

abp_numeric = ABPNumericText(abp_sys_label='ABP Systolic', abp_dia_label='ABP Diastolic', title='ABP', color='#fdd600')
graphs.append(abp_numeric)

## retrieve data from Streams view in a background job ##
def data_collector(view, graphs):
    for d in iter(view.get, None):
        for g in graphs:
            g.add(d)
            
from IPython.lib import backgroundjobs as bg
jobs = bg.BackgroundJobManager()
jobs.new(data_collector, patient_view.start_data_fetch(), graphs)

Cell Description

This cell is responsible for laying out and displaying the graphs. There is an infinite loop that continuously calls the update() method on each of the graphs. After each graph has been updated, a call to push_notebook() is made, which causes the notebook to update the graphics.


In [ ]:
import time
from bokeh.io import show
from bokeh.layouts import column, row, widgetbox

## display graphs
show(
    row(
        column(
            ecg_leadII_graph.get_figure(), 
            ecg_leadV_graph.get_figure(), 
            resp_graph.get_figure(),
            pleth_graph.get_figure()
        ), 
        column(
            leadII_poincare.get_figure(),
            widgetbox(hr_numeric.get_figure()),
            widgetbox(pulse_numeric.get_figure()),
            widgetbox(spo2_numeric.get_figure()),
            widgetbox(abp_numeric.get_figure())
        )
    ),
    
    # If using bokeh > 0.12.2, uncomment the following statement
    #notebook_handle=True
)

cnt = 0
while True:
    ## update graphs
    for g in graphs:
        g.update()

    ## update notebook 
    cnt += 1
    if cnt % 5 == 0:
        push_notebook() ## refresh the graphs
        cnt = 0
    time.sleep(0.008)

In [ ]: