This application demonstrates how users can develop Python Streaming Applications on DSX. The DSX Notebook ultimately submits two Streams applications to a local Streams cluster. The first application is a pre-compiled SPL application that simulates patient waveform and vital data, as well as publishes the data to a topic. The second application is a Python Topology application written using the Streams Python API. This application subscribes to the topic containing the patient data, performs analysis on the waveforms and sends all of the data, including the results of the analysis, to the Streams view server.
Submitting the Python application from the Notebook allows for connecting to the Streams view server in order to retrieve the data. Once the data has been retrieved, it can be analyzed, manipulated or visualized like any other data accessed from a notebook. In the case of this demo, waveform graphs and numerical widgets are being used to display the healthcare data of the patient.
The following diagram outlines the architecture of the demo.
In [ ]:
!pip install --user --upgrade streamsx
!pip install --user --upgrade "git+https://github.com/IBMStreams/streamsx.health.git#egg=healthdemo&subdirectory=samples/HealthcareJupyterDemo/package"
This cell is responsible for building and submitting the Streams applications to the Streams cluster.
This microservice comes in the form of a pre-compiled SAB file. The microservice retrieves patient waveform and vital data from a Physionet database (https://www.physionet.org/). 3 different sets of data are used as source. The patient data is submitted to the ingest-physionet topic, which allows it to be consumed from downtstream applications or services.
In [ ]:
from streamsx.topology.topology import Topology, schema
from streamsx.topology.context import ConfigParams, submit
from streamsx.topology import functions
print ('Submitting to streaming analytic service.')
# 'vcap_services.json' is created by running the "Create VCAP Service Credential" notebook
# Change the service name accordingly
config = {
ConfigParams.VCAP_SERVICES: 'vcap_services.json',
ConfigParams.SERVICE_NAME: 'Streaming-Analytics'
}
numPatients = 3
print ("Don't forget to submit patient ingest microservice manually. (Set num.patients >= 3)")
This cell is responsible for building and submitting the Streams applications to the Streams cluster.
This cell contains source code for the Python Topology application. This is a Streaming Python application that ingests the patient data from the ingest-physionet topic, performs filtering on patients that require attention, and then sends the data to the Streams view server.
In [ ]:
from healthdemo.utils import get_patient_id, get_sampled_data_values
topo = Topology('HealthcareDemo_PatientAlert')
def patientNeedsAttention(tup):
pulse = get_sampled_data_values(tup, 'PULSE')[0]
return pulse > 0 and pulse < 80
## Create a view that shows patient that requires attention
patients_requiring_attention = topo.subscribe('ingest-physionet', schema.CommonSchema.Json) \
.map(functions.identity) \
.filter(patientNeedsAttention)
view = patients_requiring_attention.view()
submit('ANALYTICS_SERVICE', topo, config)
print ("DONE")
In [ ]:
from healthdemo.medgraphs import NumericText
import re
## load BokehJS visualization library (must be loaded in a separate cell)
from bokeh.io import output_notebook, push_notebook
from bokeh.resources import INLINE
output_notebook(resources=INLINE)
%autosave 0
%reload_ext autoreload
%aimport healthdemo.utils
%aimport healthdemo.medgraphs
%autoreload 1
## create the graphs ##
graphs = []
alert_div = '''
<div style="background-color: #152935; border: 3px solid black; height: 120px; width: 700px; font-family: arial; clear: both; color: {0}">
<div style="margin-left: 5px; margin-top: 20px; font-size: 30px; float:left">{1} requires Attention!</div>
<div style="margin-right: 5px; margin-top: 20px; font-size: 30px; float:right">pulse: {2} {3}</div>
</div>
'''
for id in range(numPatients):
pulse_numeric = NumericText(signal_label='PULSE', title='Patient-%d' % (id+1), color='#e71d32', override_str=alert_div)
graphs.append(pulse_numeric)
## retrieve data from Streams view in a background job ##
def data_collector(view, graphs):
for d in iter(view.get, None):
patientId = int(re.findall('\d+', get_patient_id(d))[0])
if patientId < numPatients:
graphs[patientId-1].add(d)
from IPython.lib import backgroundjobs as bg
jobs = bg.BackgroundJobManager()
jobs.new(data_collector, view.start_data_fetch(), graphs)
In [ ]:
import time
from bokeh.io import show
from bokeh.layouts import column, widgetbox
plots = []
for g in graphs:
plots.append(widgetbox(g.get_figure()))
## display graphs
print ("Monitoring patients' pulses...")
show(column(plots),
# If using bokeh > 0.12.2, uncomment the following statement
#notebook_handle=True
)
cnt = 0
while True:
## update graphs
for g in graphs:
g.update()
## update notebook
cnt += 1
if cnt % 125 == 0:
push_notebook() ## refresh the graphs
cnt = 0
time.sleep(0.008)
This cell is responsible for building and submitting the Streams applications to the Streams cluster.
This cell contains source code for the Python Topology application. As described in the above architecture, this is a Streaming Python application that ingests the patient data from the ingest-physionet topic, performs filtering and analysis on the data, and then sends the data to the Streams view server.
In [ ]:
from healthdemo.patientmonitoring_functions import streaming_rpeak
from healthdemo.healthcare_functions import PatientFilter, GenTimestamp, aggregate
from healthdemo.windows import SlidingWindow
def getPatientView(patient_id):
'''
Select data of given patient_id, perform analysis and return view.
Parameters
----------
patient_id: int
patient_id (1-based)
Returns
-------
view: topology.View
view data from Streams server
'''
topo = Topology('HealthcareDemo_Patient%d' % (patient_id))
## Ingest, preprocess and aggregate patient data
sample_rate = 125
patient_data = topo.subscribe('ingest-physionet', schema.CommonSchema.Json) \
.map(functions.identity) \
.filter(PatientFilter('patient-%d' % (patient_id))) \
.transform(GenTimestamp(sample_rate)) \
.transform(SlidingWindow(length=sample_rate, trigger=sample_rate-1)) \
.transform(aggregate)
## Calculate RPeak and RR delta
patient_data = streaming_rpeak(patient_data, sample_rate, data_label='ECG Lead II')
## Create a view of the data
patient_view = patient_data.view()
submit('ANALYTICS_SERVICE', topo, config)
return patient_view
# Retrieve view for a patient
patient_view = getPatientView(2)
print('DONE')
This cell initializes all of the graphs that will be used as well as creates the background job that access the view data.
The view data is continuously retrieved from the Streams view server in a background job. Each graph object receives a copy of the data. The graph objects extracts and stores the data that is relevant for that particular graph. Each time a call to update()
is made on a graph object, the next data point is retrieved and displayed. Each graph object maintains an internal queue so that each time a call to update()
is made, the next element in the queue is retrieved and removed.
In [ ]:
from healthdemo.medgraphs import ECGGraph, PoincareGraph, NumericText, ABPNumericText
## load BokehJS visualization library (must be loaded in a separate cell)
from bokeh.io import output_notebook, push_notebook
from bokeh.resources import INLINE
output_notebook(resources=INLINE)
%autosave 0
%reload_ext autoreload
%aimport healthdemo.utils
%aimport healthdemo.medgraphs
%autoreload 1
## create the graphs ##
graphs = []
ecg_leadII_graph = ECGGraph(signal_label='ECG Lead II', title='ECG Lead II', plot_width=600, min_range=-0.5, max_range=2.0)
graphs.append(ecg_leadII_graph)
leadII_poincare = PoincareGraph(signal_label='Poincare - ECG Lead II', title='Poincare - ECG Lead II')
graphs.append(leadII_poincare)
ecg_leadV_graph = ECGGraph(signal_label='ECG Lead V', title='ECG Lead V', plot_width=600)
graphs.append(ecg_leadV_graph)
resp_graph = ECGGraph(signal_label='Resp', title='Resp', min_range=-1, max_range=3, plot_width=600)
graphs.append(resp_graph)
pleth_graph = ECGGraph(signal_label='Pleth', title='Pleth', min_range=0, max_range=5, plot_width=600)
graphs.append(pleth_graph)
hr_numeric = NumericText(signal_label='HR', title='HR', color='#7cc7ff')
graphs.append(hr_numeric)
pulse_numeric = NumericText(signal_label='PULSE', title='PULSE', color='#e71d32')
graphs.append(pulse_numeric)
spo2_numeric = NumericText(signal_label='SpO2', title='SpO2', color='#8cd211')
graphs.append(spo2_numeric)
abp_numeric = ABPNumericText(abp_sys_label='ABP Systolic', abp_dia_label='ABP Diastolic', title='ABP', color='#fdd600')
graphs.append(abp_numeric)
## retrieve data from Streams view in a background job ##
def data_collector(view, graphs):
for d in iter(view.get, None):
for g in graphs:
g.add(d)
from IPython.lib import backgroundjobs as bg
jobs = bg.BackgroundJobManager()
jobs.new(data_collector, patient_view.start_data_fetch(), graphs)
In [ ]:
import time
from bokeh.io import show
from bokeh.layouts import column, row, widgetbox
## display graphs
show(
row(
column(
ecg_leadII_graph.get_figure(),
ecg_leadV_graph.get_figure(),
resp_graph.get_figure(),
pleth_graph.get_figure()
),
column(
leadII_poincare.get_figure(),
widgetbox(hr_numeric.get_figure()),
widgetbox(pulse_numeric.get_figure()),
widgetbox(spo2_numeric.get_figure()),
widgetbox(abp_numeric.get_figure())
)
),
# If using bokeh > 0.12.2, uncomment the following statement
#notebook_handle=True
)
cnt = 0
while True:
## update graphs
for g in graphs:
g.update()
## update notebook
cnt += 1
if cnt % 5 == 0:
push_notebook() ## refresh the graphs
cnt = 0
time.sleep(0.008)
In [ ]: