Healthcare Python Streaming Application Demo

This application demonstrates how users can develop Python Streaming Applications from a Jupyter Notebook. The Jupyter Notebook ultimately submits two Streams applications to a local Streams cluster. The first application is a pre-compiled SPL application that simulates patient waveform and vital data, as well as publishes the data to a topic. The second application is a Python Topology application written using the Streams Python API. This application subscribes to the topic containing the patient data, performs analysis on the waveforms and sends all of the data, including the results of the analysis, to the Streams view server.

Submitting the Python application from the Notebook allows for connecting to the Streams view server in order to retrieve the data. Once the data has been retrieved, it can be analyzed, manipulated or visualized like any other data accessed from a notebook. In the case of this demo, waveform graphs and numerical widgets are being used to display the healthcare data of the patient.

The following diagram outlines the architecture of the demo.

Cell Description

This cell is responsible for installing python modules required for running this notebook


In [ ]:
!pip install --upgrade streamsx
!pip install --upgrade "git+https://github.com/IBMStreams/streamsx.health.git#egg=healthdemo&subdirectory=samples/HealthcareJupyterDemo/package"

Cell Description

This cell is responsible for building and submitting the Streams applications to the Streams cluster.

PhysionetIngestServiceMulti microservice

This microservice comes in the form of a pre-compiled SAB file. The microservice retrieves patient waveform and vital data from a Physionet database (https://www.physionet.org/). 3 different sets of data are used as source. The patient data is submitted to the ingest-physionet topic, which allows it to be consumed from downtstream applications or services.


In [ ]:
import getpass

from streamsx.topology import schema
from streamsx.topology.topology import Topology
from streamsx.topology.context import ConfigParams, submit
from streamsx.topology import functions

from healthdemo.streamtool import Streamtool

print ('Submitting to a distributed instance.')

username = input('Username: ')
password = getpass.getpass(prompt='Password: ')

## display Streams Console link
print("Streams Console: ", end='')
streamtool = Streamtool()
streamtool.geturl()

numPatients = 3
    
## submit patient ingest microservice for 3 patients
streamtool.submitjob('../services/com.ibm.streamsx.health.physionet.PhysionetIngestServiceMulti.sab', 
                     params=['-P', 'num.patients=%d' % (numPatients)])

Cell Description

This cell is responsible for building and submitting the Streams applications to the Streams cluster.

Healthcare Patient Python Topology Application

This cell contains source code for the Python Topology application. As described in the above architecture, this is a Streaming Python application that ingests the patient data from the ingest-physionet topic, performs filtering and analysis on the data, and then sends the data to the Streams view server.


In [ ]:
from streamsx import rest
from healthdemo.patientmonitoring_functions import streaming_rpeak
from healthdemo.healthcare_functions import PatientFilter, GenTimestamp, aggregate
from healthdemo.windows import SlidingWindow

def getPatientView(patient_id):
    '''
    Select data of given patient_id, perform analysis and return view.
    
    Parameters
    ----------
    patient_id: int
      patient_id (1-based)
      
    Returns
    -------
    view: topology.View
      view data from Streams server 
    '''
    topo = Topology('HealthcareDemo_Patient%d' % (patient_id))

    ## Ingest, preprocess and aggregate patient data
    sample_rate = 125
    patient_data = topo.subscribe('ingest-physionet', schema.CommonSchema.Json) \
                       .map(functions.identity) \
                       .filter(PatientFilter('patient-%d' % (patient_id))) \
                       .transform(GenTimestamp(sample_rate)) \
                       .transform(SlidingWindow(length=sample_rate, trigger=sample_rate-1)) \
                       .transform(aggregate)
                        
    ## Calculate RPeak and RR delta
    patient_data = streaming_rpeak(patient_data, sample_rate, data_label='ECG Lead II')

    ## Create a view of the data
    patient_view = patient_data.view()
    
    sc = rest.StreamsConnection(username=username, password=password)
    sc.session.verify=False
    submit('DISTRIBUTED', topo, config={ ConfigParams.STREAMS_CONNECTION: sc })
    return patient_view
    
# Retrieve view for a patient    
patient_view = getPatientView(2)
    
print('DONE')

Cell Description

This cell initializes all of the graphs that will be used as well as creates the background job that access the view data.

The view data is continuously retrieved from the Streams view server in a background job. Each graph object receives a copy of the data. The graph objects extracts and stores the data that is relevant for that particular graph. Each time a call to update() is made on a graph object, the next data point is retrieved and displayed. Each graph object maintains an internal queue so that each time a call to update() is made, the next element in the queue is retrieved and removed.


In [ ]:
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

from healthdemo.medgraphs import ECGGraph, PoincareGraph, NumericText, ABPNumericText

## load BokehJS visualization library (must be loaded in a separate cell)
from bokeh.io import output_notebook, push_notebook
from bokeh.resources import INLINE
output_notebook(resources=INLINE)
%autosave 0
%reload_ext autoreload
%aimport healthdemo.utils
%aimport healthdemo.medgraphs
%autoreload 1

## create the graphs ##
graphs = []

ecg_leadII_graph = ECGGraph(signal_label='ECG Lead II', title='ECG Lead II', plot_width=600, min_range=-0.5, max_range=2.0)
graphs.append(ecg_leadII_graph)

leadII_poincare = PoincareGraph(signal_label='Poincare - ECG Lead II', title='Poincare - ECG Lead II')
graphs.append(leadII_poincare)

ecg_leadV_graph = ECGGraph(signal_label='ECG Lead V', title='ECG Lead V', plot_width=600)
graphs.append(ecg_leadV_graph)

resp_graph = ECGGraph(signal_label='Resp', title='Resp', min_range=-1, max_range=3, plot_width=600)
graphs.append(resp_graph)

pleth_graph = ECGGraph(signal_label='Pleth', title='Pleth', min_range=0, max_range=5, plot_width=600)
graphs.append(pleth_graph)

hr_numeric = NumericText(signal_label='HR', title='HR', color='#7cc7ff')
graphs.append(hr_numeric)

pulse_numeric = NumericText(signal_label='PULSE', title='PULSE', color='#e71d32')
graphs.append(pulse_numeric)

spo2_numeric = NumericText(signal_label='SpO2', title='SpO2', color='#8cd211')
graphs.append(spo2_numeric)

abp_numeric = ABPNumericText(abp_sys_label='ABP Systolic', abp_dia_label='ABP Diastolic', title='ABP', color='#fdd600')
graphs.append(abp_numeric)

## retrieve data from Streams view in a background job ##
def data_collector(view, graphs):
    for d in iter(view.get, None):
        for g in graphs:
            g.add(d)
            
from IPython.lib import backgroundjobs as bg
jobs = bg.BackgroundJobManager()
jobs.new(data_collector, patient_view.start_data_fetch(), graphs)

Cell Description

This cell is responsible for laying out and displaying the graphs. There is an infinite loop that continuously calls the update() method on each of the graphs. After each graph has been updated, a call to push_notebook() is made, which causes the notebook to update the graphics.


In [ ]:
import time
from bokeh.io import show
from bokeh.layouts import column, row, widgetbox

## display graphs
show(
    row(
        column(
            ecg_leadII_graph.get_figure(), 
            ecg_leadV_graph.get_figure(), 
            resp_graph.get_figure(),
            pleth_graph.get_figure()
        ), 
        column(
            leadII_poincare.get_figure(),
            widgetbox(hr_numeric.get_figure()),
            widgetbox(pulse_numeric.get_figure()),
            widgetbox(spo2_numeric.get_figure()),
            widgetbox(abp_numeric.get_figure())
        )
    ),
    
    # If using bokeh <= 0.12.2, comment out the following argument
    notebook_handle=True
)

cnt = 0
while True:
    ## update graphs
    for g in graphs:
        g.update()

    ## update notebook 
    cnt += 1
    if cnt % 5 == 0:
        push_notebook() ## refresh the graphs
        cnt = 0
    time.sleep(0.008)

In [ ]: