In [12]:
from pylab import *
%matplotlib inline
import datetime, glob, re, serial, struct, subprocess
from dateutil import tz
from collections import OrderedDict
import xml.etree.ElementTree as ET
In [13]:
def exec_ipynb(url):
import json, re, urllib2
nb = (urllib2.urlopen(url) if re.match(r'https?:', url) else open(url)).read()
exec '\n'.join([''.join(cell['input']) for cell in json.loads(nb)['worksheets'][0]['cells'] if cell['cell_type'] == 'code']) in globals()
exec_ipynb('Fluxtream-Library.ipynb')
fluxtream_login()
Out[13]:
In [14]:
# Note: if you are missing the "serial" package, you'll want to install pySerial
# On a mac, we've had luck installing by typing
# easy_install pySerial
# from the commandline
# Interface to Nonin 3150
#
# Using references from Nonin:
# "3150-Specifications_7970_000-Rev-A.pdf"
# "3150 Commands.docx" (includes format of memory playback)
class Nonin3150:
"""Interface to Nonin WristOx2 3150"""
fields = OrderedDict([
('Reserved1', 'c'), # Length 1.
('BluetoothEnable', 'c'), # ASCII len 1. enabled:'1', disabled:'2'
('ActivationOption', 'c'), # ASCII len 1. always log:'1', use start/stop time:'2', display only:'3'
('StorageRate', 'c'), # ASCII len 1. every second:'1', 2 seconds: '2', 4 seconds: '4'
('DisplayOption', 'c'), # ASCII len 1. full display while logging:'1', partial display:'2'
('StartTime1', '10s'), # ASCII YYMMDDhhmm. Only used for ActivationOption '2'
('StopTime1', '10s'), # ASCII YYMMDDhhmm. Only used for ActivationOption '2'
('StartTime2', '10s'), # ASCII YYMMDDhhmm. Only used for ActivationOption '2'
('StopTime2', '10s'), # ASCII YYMMDDhhmm. Only used for ActivationOption '2'
('StartTime3', '10s'), # ASCII YYMMDDhhmm. Only used for ActivationOption '2'
('StopTime3', '10s'), # ASCII YYMMDDhhmm. Only used for ActivationOption '2'
('ProgrammableIdentification', '50s'), # ASCII len 50. Arbitrary ID settable by user
('SoftwarePartNumber', '4s'), # ASCII len 50. First 4 digits
('SoftwareRevision', '3s'), # ASCII len 3
('SoftwareRevDate', '6s'), # ASCII YYMMDD
('Reserved2', '6s'), # Length 6
])
format = ''.join(fields.values())
date_format = '%Y-%m-%d %H:%M:%S %Z'
def __init__(self):
self.device_path = self.find_device_path()
print 'Opening %s...' % self.device_path
self.device = serial.Serial(self.device_path, timeout=1)
print 'Opened %s' % self.device_path
config = self.get_config()
print ('Found Nonin 3150 at %s, software %s.%s.20%s' %
(self.device_path, config['SoftwarePartNumber'],
config['SoftwareRevision'], config['SoftwareRevDate']))
time = self.get_current_time()
print 'Current time as reported by device in GMT: %s' % time.strftime(self.date_format)
localtime = time.astimezone(tz.tzlocal())
print ' (Converted to your current local timezone: %s)' % localtime.strftime(self.date_format)
@staticmethod
def find_device_path():
try:
ioreg = subprocess.check_output(['ioreg', '-l', '-w', '0', '-c', 'IOUSBDevice', '-r'])
candidates = [match.group(1) for match in re.finditer(r'"USB Product Name" = "Model 3150".*"USB Vendor Name" = "Nonin.*"IOCalloutDevice" = "(.*?)"', ioreg, re.DOTALL)]
except:
candidates = glob.glob('/dev/cu.usbmodem*')
if len(candidates) == 0:
raise Exception("Can't find Nonin device. Please be sure it's plugged in using the USB cable.")
if len(candidates) > 1:
raise Exception("Hmm, found multiple USB serial ports. Need to address this issue in the software.")
return candidates[0]
def get_config(self):
self.device.flushInput()
self.device.write('CFG?\r\n')
self.require_ack()
data = self.require_bytes(134)
checksum = struct.unpack('>H', self.require_bytes(2))[0]
if checksum != sum([ord(ch) for ch in data]):
raise Exception('CFG? incorrect checksum')
unpacked = struct.unpack(self.format, data)
return OrderedDict(zip(self.fields.keys(), unpacked))
def set_config(self, config):
if config.keys() != self.fields.keys():
raise Exception('config must be an OrderedDict with keys as returned from get_config')
data = struct.pack(self.format, *config.values())
checksum = struct.pack('>H', sum([ord(ch) for ch in data]))
self.device.flushInput()
self.device.write('CFG=' + data + checksum + '\r\n')
self.require_ack()
self.require_crlf()
def enable_logging(self, interval=1):
config = self.get_config()
# Always log when sensor detects signal
config['ActivationOption'] = '1'
interval = int(interval)
valid_intervals = [1,2,4]
if not interval in valid_intervals:
raise Exception('interval must be one of %s' % valid_intervals)
config['StorageRate'] = str(interval)
self.set_config(config)
# By convention, this library always stores time on the 3150 in UTC, since the 3150
# supports neither daylight savings time nor timezones.
def get_current_time(self):
self.device.flushInput()
self.device.write('DTM?\r\n')
self.require_ack();
time = self.require_bytes(12)
self.require_crlf();
return datetime.datetime.strptime(time, '%y%m%d%H%M%S').replace(tzinfo=tz.tzutc())
def set_current_time(self):
self.device.flushInput()
# Write time in UTC, format YYMMDDhhmmss
self.device.write(datetime.datetime.utcnow().strftime('DTM=%y%m%d%H%M%S\r\n'))
self.require_ack()
self.require_crlf()
def get_header(self):
self.device.flushInput()
self.device.write('HDR?\r\n')
self.require_ack()
return self.read_until_timeout()
def clear_sessions(self):
self.device.flushInput()
self.device.write(datetime.datetime.utcnow().strftime('MCL!\r\n'))
self.require_ack()
self.require_crlf()
print "Sessions cleared on Nonin"
def read_sessions(self):
self.device.flushInput()
self.device.write('MPB?\r\n')
self.require_ack()
print 'Reading memory from Nonin...'
memory = self.read_until_timeout()
print 'Read %d bytes' % len(memory)
if len(memory) % 3 != 0:
raise Exception('MPB?: Invalid memory length read')
# Check and strip checksums
data = []
for i in range(0, len(memory) / 3):
bytes = [ord(ch) for ch in memory[i * 3 : (i + 1) * 3]]
if (bytes[0] + bytes[1]) % 256 != bytes[2]:
raise Exception('MPB?: invalid checksum in triplet %d' % i)
data.append((bytes[0], bytes[1]))
# Decode sessions
header = (254, 253)
i = 0
sessions = []
while i < len(data):
if data[i] != header:
raise Exception('MPB?: invalid header at triplet %d' % i)
print 'Session header starting at triplet %d' % i
i += 1
(seconds_per_sample, format) = data[i]
if format != 2:
raise Exception('MPB?: unknown format at triplet %d' % i)
i += 1
print ' Seconds per sample: %d' % seconds_per_sample
current_time = self.decode_memory_time(data, i); i += 3
# Don't read start and stop time unless we confirm there are samples
# The Nonin tends to have an empty session, with invalid start and stop time
stop_time_index = i; i += 3;
start_time_index = i; i += 3;
session = []
valid = False
while i < len(data) and data[i] != header:
if len(session) == 0:
# We have a non-empty session. Go ahead and parse the times
# Samples are reversed in time
sample_time = stop_time = self.decode_memory_time(data, stop_time_index);
start_time = self.decode_memory_time(data, start_time_index);
if start_time and stop_time:
print ' Time range: %s to %s' % (start_time.strftime(self.date_format),
stop_time.strftime(self.date_format))
valid = True
else:
print ' Invalid start or stop time'
(pulse_rate, spo2) = data[i]; i += 1
if pulse_rate == 255:
pulse_rate = None
elif pulse_rate > 200:
# Values over 200 are compressed to handle high pulse rates
pulse_rate = 200 + (pulse_rate - 200) * 2
if spo2 == 255:
spo2 = None
session.append((sample_time, pulse_rate, spo2))
if sample_time:
sample_time -= datetime.timedelta(seconds=1)
session.reverse()
print ' %s data samples' % len(session)
if valid and len(session) > 0:
sessions.append(session)
print 'Total of %d valid sessions' % len(sessions)
return sessions
@staticmethod
def decode_memory_time(memory, i):
[(month, day), (year, minute), (second, hour)] = memory[i : i + 3]
try:
return datetime.datetime(year + 2000, month, day, hour, minute, second, tzinfo=tz.tzutc())
except:
print "Couldn't make a time from YY=%d MM=%d DD=%d HH:MM:SS=%d:%d:%d" % (year, month, day, hour, minute, second)
return None
def read_until_timeout(self):
ret = ''
while True:
read = self.device.read(1000)
if read == '':
return ret
ret += read
def require_bytes(self, n):
ret = ''
while len(ret) < n:
read = self.device.read(n - len(ret))
ret += read
if read == '':
raise Exception('Expected %d bytes but only received %d' % (n, len(ret)))
return ret
def require_ack(self):
if self.device.read(1) != '\x06':
raise Exception('Expected ACK not received')
def require_crlf(self):
if self.device.read(2) != '\r\n':
raise Exception('Expected CRLF not received')
In [15]:
nonin = Nonin3150()
nonin.enable_logging()
# Set time on Nonin from laptop, using GMT timezone
current_time = nonin.get_current_time()
print 'Before setting Nonin clock:'
print current_time.strftime(' Time on Nonin, GMT: %Y-%m-%d %H:%M:%S UDT')
print current_time.astimezone(tz.tzlocal()).strftime(' Time on Nonin, translated to local timezone: %Y-%m-%d %H:%M:%S%z')
print 'Setting clock on Nonin to current host time...'
nonin.set_current_time()
current_time = nonin.get_current_time()
print 'After setting Nonin clock:'
print current_time.strftime(' Time on Nonin, GMT: %Y-%m-%d %H:%M:%S UDT')
print current_time.astimezone(tz.tzlocal()).strftime(' Time on Nonin, translated to local timezone: %Y-%m-%d %H:%M:%S%z')
sessions = nonin.read_sessions()
import itertools
samples = list(itertools.chain(*sessions))
print len(samples), 'samples'
fig, ax = plt.subplots(figsize=(40,6))
times = [sample[0] for sample in samples]
ax.plot(times, [sample[1] for sample in samples], label="Pulse")
ax.plot(times, [sample[2] for sample in samples], label="SpO2")
# Legend above plot
ax.legend(loc='upper center', bbox_to_anchor=(0.5, 1.10),
fancybox=True, shadow=True, ncol=5)
# plt.gcf().autofmt_xdate() # Nice dates on X axis
#ax.xaxis.set_major_formatter(FuncFormatter(lambda x, pos: '%.6f' % x))
ax.grid(alpha=0.5, linestyle='dashed', linewidth=0.5) # Grid
ax.xaxis.set_major_formatter(matplotlib.dates.DateFormatter('%m/%d %H:%M:%S %Z', tz=tz.tzlocal()))
plt.xticks(rotation='vertical')
None
In [11]:
import base64
import json
import httplib
import urllib
# Concatenate all the sessions into a single list of samples
samples = list(itertools.chain(*sessions))
# April 2014: Bug in fluxtream.org prevents uploading samples with nulls; for now, filter out None's
# We can remove these 3 lines once that's fixed
print '%d samples' % len(samples)
samples = [sample for sample in samples if not None in sample]
print '%d samples after filtering out samples containing None' % len(samples)
def epoch_time(dt):
epoch = datetime.datetime(1970, 1, 1, tzinfo=tz.tzutc())
return (dt - epoch).total_seconds()
# Convert timestamps to epoch time
print 'Data has %d samples' % len(samples)
data = [[epoch_time(sample[0]), sample[1], sample[2]] for sample in samples]
print 'Excerpt:', data[0:10]
dev_nickname = 'Nonin3150'
channel_names = ['Pulse', 'SpO2']
success = fluxtream_upload(dev_nickname, channel_names, data)
if success:
nonin.clear_sessions()
print datetime.datetime.now().strftime('Current timestamp: %Y-%m-%d %H:%M:%S')
In [ ]: