Introduction

This iPython notebook walks you through reading data from a Fluxtream data channel, computing a function on it, and uploading a new computed channel back to Fluxtream. This particular example reads data from a nonin SpO2 sensor channel and writes back a thresholded channel called SpO2_thresh_X, where X is the value of a given threshold, for data points that are below that threshold.

The real value of this notebook, I hope, is to provide an example of how to read data from a Fluxtream datastore channel, do something to it, and write it back. If you have a Nonin 3150, you can import and run the Nonin-WristOx2-3150 notebook to upload your channels to try this on. Otherwise, if you email me at info@fluxtream.org I can add you as a buddy to access the Nonin 3150 channels uploaded to the test account (Guest ID=1). Or you can just modify the channel names and function to do something interesting to your own data channels.

If you are new to iPython notebooks, here is the main IP[y] website. You will need to install python and iPython notebook on your local system, run a local ipython kernel, and install a local copy of this notebook to be able to execute and modify the code below. Install instructions are here. On osx systems, you can start the server by going to Terminal and calling 'ipython notebook'. This will start a local web server and open a an IP[y] page talking to it in a web browser. Within the IP[y] page, you can open a saved iPython notebooy by going to File/Open.

Once you have IP[y] generally working on your system, here's a brief intro in how to use it:

  • A green outline shows the currently selected cell.
  • Select a different cell by clicking on it, or by using escape to enter command mode (grey outline) and use the keyboard shortcuts listed under the Help menu.
  • Execute the currently selected cell by either clicking the play button on the icon bar at the top, selecting Cell/Run from the menu bar, or by using the keyboard shortcut Shift-Return.

When a given cell is executed, it may print output which appears below the cell, and the cursor will continue to the next cell. If the next cell is tall, you might need to scroll back up to see the previous cell's output.

Each cell in this notebook pertains to a particular step in the process, topic, or action requiring your input and contains comments at the top saying what it's about and what you should do.

Cells that require entry of sensitive information, such as passwords, start with a phrase like "Execute and fill in the fields below". These generally create entry forms in the output area below the cell that you need to fill in. Generally, the cell will clear the sensitive input boxes after clicking the button. It may also print out suggestions about how you could set up a new cell for future use if you're confident other's won't see your copy of the notebook.

Cells that require customization for your own setup start with "Modify". These include cells where you configure which channels you want to process and what you want to call the resulting data. These require some thought.

Cells that define functions or do other things that don't require user input or modification generally just start with "Execute". These can just be executed without much consideration, though you may want to go back later to understand or modify them.

Please enjoy, tinker, modify, etc. Feel free to contact info@fluxtream.org if you have questions.

Note that uploading data multiple times to a given device and channel with identical time values each time will safely overwrite the previous values. However, there is no API or user interaction component in Fluxtream that allows the deletion of a previously-uploaded device or channel, and you can't delete data points already uploaded to a given channel. If you create device names or channel names, or upload data at incorrect timepoints within a given channel, and later regret it, please send the info about your situation, including your Fluxtream username, guest ID, and the details of which devices and or channels you want deleted to info@fluxtream.org. You can get your Guest ID by doing the step below to set up your Fluxtream credentials and looking at the value of fluxtream_guest_id. Also note that the Fluxtream upload API cannot currently handle empty cells within the data array used in an upload call. I'm hoping to fix this in the future.

Setup for uploading to Fluxtream


In [1]:
# Execute this cell to define the functions for calling the Fluxtream upload API for the 
# credentials entered below
import json, subprocess, urllib, csv

# By default, the upload function will send data to the main server at fluxtream.org.  
# If you want to have this use a different fluxtream server, change it here
# and make sure the username and password entered below are valid on that server.
global fluxtream_server
fluxtream_server = "fluxtream.org"

def setup_fluxtream_credentials():
    # Call the Fluxtream guest API, documented at 
    #   https://fluxtream.atlassian.net/wiki/display/FLX/BodyTrack+server+APIs#BodyTrackserverAPIs-GettheIDfortheguest

    # Make sure it works and harvest the Guest ID for future use
    global fluxtream_server, fluxtream_username, fluxtream_password, fluxtream_guest_id

    # Make sure we have fluxtream credentials set properly
    if not('fluxtream_server' in globals() and 
           'fluxtream_username' in globals() and
           'fluxtream_password' in globals()):
        raise Exception("Need to enter Fluxtream credentials before uploading data.  See above.")

    cmd = ['curl', '-v']
    cmd += ['-u', '%s:%s' % (fluxtream_username, fluxtream_password)]
    cmd += ['https://%s/api/guest' % fluxtream_server]

    result_str = subprocess.check_output(cmd)
    #print '  Result=%s' % (result_str)

    try:
        response = json.loads(result_str)

        if 'id' in response:
            fluxtream_guest_id = int(response['id'])
        else:
            raise Exception('Received unexpected response %s while trying to check credentials for %s on %s' % (response, 
                                                                                                            fluxtream_username, 
                                                                                                            fluxtream_server))

        print 'Verified credentials for user %s on %s work. Guest ID=%d' % (fluxtream_username, fluxtream_server, fluxtream_guest_id)
    except:
        print "Attempt to check credentials of user %s failed" % (fluxtream_username)
        print "Server returned response of: %s" % (result_str)
        print "Check login to https://%s works and re-enter your Fluxtream credentials above" % (fluxtream_server)
        raise
    
def fluxtream_upload(dev_nickname, channel_names, data):
    global fluxtream_server, fluxtream_username, fluxtream_password
    
    # Make sure we have some data to send
    if data == None or len(data)<1:
        print 'Nothing to upload to %s %s' % (dev_nickname, channel_names)        
        return

    # Make sure we have fluxtream credentials set properly
    if not('fluxtream_server' in globals() and 
           'fluxtream_username' in globals() and
           'fluxtream_password' in globals()):
        raise Exception("Need to enter Fluxtream credentials before uploading data.  See above.")

    # Send to BodyTrack upload API, documented at 
    #   https://fluxtream.atlassian.net/wiki/display/FLX/BodyTrack+server+APIs#BodyTrackserverAPIs-Storingdata
    cmd = ['curl', '-v']
    cmd += ['-u', '%s:%s' % (fluxtream_username, fluxtream_password)]
    cmd += ['-d', 'dev_nickname=%s' % dev_nickname]
    cmd += ['-d', 'channel_names=%s' % json.dumps(channel_names)]
    cmd += ['-d', 'data=%s' % json.dumps(data)]
    cmd += ['https://%s/api/bodytrack/upload' % fluxtream_server]

    print 'Uploading %d data points to %s\'s account on server %s, device %s, channels %s' % (len(data), 
                                                                                              fluxtream_username,
                                                                                              fluxtream_server, 
                                                                                              dev_nickname,
                                                                                              channel_names)
    
    # If you're having trouble debugging this function, uncomment the following two print statements 
    # to see the exact curl command and result string
    #print '  Cmd=%s' % (cmd)
    result_str = subprocess.check_output(cmd)
    #print '  Result=%s' % (result_str)

    try:
        response = json.loads(result_str)
        if response['result'] != 'OK':
            raise Exception('Received non-OK response %s while trying to upload to %s' % (response, dev_nickname))
        
        print 'Upload to %s %s (%d rows, %d to %d) succeeded' % (dev_nickname, channel_names, len(data), data[0][0], data[-1][0])
    except:
        print "Attempt to upload to %s as user %s failed. Check that your credentials are ok" % (fluxtream_server, 
                                                                                                 fluxtream_username)
        print "Server returned response: %s" % (result_str)
        raise
    
# To get your own data, pass in the global fluxtream_guest_id which is computed 
# in setup_fluxtream_credentials() when you execute the Fluxtream login cell.
# To get a buddy's data, you first need to figure out what their Guest ID is.
# This will show up in the Chrome developer console in tile requests when you 
# look at their data in the timeline or BodyTrack app.  

# For example, if the test account is my buddy, I would select 
# 'View test test's data' from the upper right 
# hand menu, turn on developer tools, and go to the Fluxtream
# timeline tab.  In the developer tools' network tab I would 
# see fetches that look like:
#    7.21370.json
#    /api/bodytrack/tiles/1/BodyMedia.activityType
# The value between 'tiles' and the device_name.channel_name is
# that account's Guest ID.  In that case, I would call
# fluxtream_get_sources_list with an arg of 1.
def fluxtream_get_sources_list(guest_id):
    global fluxtream_server, fluxtream_username, fluxtream_password

    # Make sure we have fluxtream credentials set properly.  
    if not('fluxtream_server' in globals() and 
           'fluxtream_username' in globals() and
           'fluxtream_password' in globals()):
        raise Exception("Need to enter Fluxtream credentials.  See above.")

    # Send to BodyTrack upload API, documented at 
    #   https://fluxtream.atlassian.net/wiki/display/FLX/BodyTrack+server+APIs#BodyTrackserverAPIs-Storingdata
    cmd = ['curl', '-v']
    cmd += ['-u', '%s:%s' % (fluxtream_username, fluxtream_password)]
    cmd += ['https://%s/api/bodytrack/users/%d/sources/list' % (fluxtream_server, guest_id)]

    result_str = subprocess.check_output(cmd)
    #print '  Result=%s' % (result_str)

    try:
        response = json.loads(result_str)
        print 'Read of sources list for guest_id=%d succeeded' % (guest_id)
        return response
    except:
        print "Attempt to upload to %s as user %s failed. Check that your credentials are ok" % (fluxtream_server, 
                                                                                                 fluxtream_username)
        print "Server returned response: %s" % (result_str)
        raise

def fluxtream_get_device_names(sources_list):
    device_names = []
    for dev in sources_list['sources']:
        device_names.append(dev['name'])
        
    return device_names

def fluxtream_get_device_info(device_name, sources_list):
    for dev in sources_list['sources']:
        if(dev['name'] == device_name):
            return dev
        
    return None

def fluxtream_get_channel_names(device_name, sources_list):
    dev_info = fluxtream_get_device_info(device_name, sources_list)

    channel_names = []
    
    for channel in dev_info['channels']:
        channel_names.append(channel['name'])
        
    return channel_names

def fluxtream_get_channel_info(device_name, channel_name, sources_list):
    dev_info = fluxtream_get_device_info(device_name, sources_list)
    
    # Check to make sure that we found info for the requested device.
    # If not, return None
    if not dev_info:
        return None
    
    for channel_info in dev_info['channels']:
        if(channel_info['name'] == channel_name):
            return channel_info
        
    return None

# Takes a guest_id, an array of <device_name>.<channel_name> strings, and a time range and returns a CSV reader.
# Iterate over the rows using reader.next(), which returns a row array with entries corresponding to 
#   Epoch, [dev_ch_names]
# Where Epoch is the epoch timestamp (aka unixtime) for the values in the row, and the i+1'th column of the row 
# corresponds to the channel in dev_ch_names[i]

# See comment on fluxtream_get_sources_list for info about how to choose the value for guest_id
def fluxtream_get_csv(guest_id, dev_ch_names, start_time, end_time):
    global fluxtream_server, fluxtream_username, fluxtream_password

    # Make sure we have fluxtream credentials set properly.  
    if not('fluxtream_server' in globals() and 
           'fluxtream_username' in globals() and
           'fluxtream_password' in globals()):
        raise Exception("Need to enter Fluxtream credentials.  See above.")

    # Send to BodyTrack upload API, documented at 
    #   https://fluxtream.atlassian.net/wiki/display/FLX/BodyTrack+server+APIs#BodyTrackserverAPIs-Storingdata
    cmd = ['curl', '-v']
    cmd += ['-u', '%s:%s' % (fluxtream_username, fluxtream_password)]
    # Need to convert the dev_ch_names array into json and URL encode it to create the channels arg
    # TODO: how do we confirm that dev_ch_names is in fact an array?
    ch_spec_str = json.dumps(dev_ch_names)
    ch_spec_str = urllib.quote(ch_spec_str)
    cmd += ['https://%s/api/bodytrack/exportCSV/%d/fluxtream-export-from-%d-to-%d.csv?channels=%s' % (fluxtream_server, guest_id, 
                                                                                                      int(start_time), int(end_time), 
                                                                                                      ch_spec_str)]
    #print '  cmd=%s' % (cmd)
    result_str = subprocess.check_output(cmd)
    #print '  Result=%s' % (result_str)
    # If the API call worked, result_str should be a CSV file
    # with the first line a header consisting of EpochTime, [dev_ch_names]
    # TODO: how do we check if it did work?

    # Create a CSV reader that iterates over the lines of the response
    csv_reader = csv.reader(result_str.splitlines(), delimiter=',')
    header = csv_reader.next()
    
    # Do some checks to make sure we got something reasonable
    if len(header) != len(dev_ch_names)+1:
        raise Exception("Expected header for CSV export of %s to contain %d columns, but only found %d.  Please double check that dev_ch_names are all valid" % (dev_ch_names, len(dev_ch_names)+1, len(header)))

    # Check the columns are what we expect
    for i in range(0,len(dev_ch_names)):
        if(dev_ch_names[i] != header[i+1]):
            raise Exception("Expected column %d of CSV header to be %s, but found %s instead.  Please double check that dev_ch_names are all valid" % (i+1, dev_ch_names[i], header[i+1]))
            
    # At this point, we can be confident that the columns map to Epoch, [dev_ch_names] as expected.
    # Return the csv reader.  Iterate over the rows using reader.next()
    return csv_reader

In [2]:
# Execute and fill in the fields below to set your Fluxtream credentials.  

from IPython.html import widgets # Widget definitions
from IPython.display import display # Used to display widgets in the notebook

def set_fluxtream_password(this):
    global fluxtream_username, fluxtream_password
    fluxtream_username = fluxtream_username_widget.value
    fluxtream_password = fluxtream_password_widget.value
    fluxtream_password_widget.value = ''
    setup_fluxtream_credentials()

    print "To make persistent for future restarts, insert a cell, paste in:"
    print ""
    print "global fluxtream_username, fluxtream_password"
    print "fluxtream_username = \"%s\"" % (fluxtream_username)
    print "fluxtream_password = \"xxx\""
    print "setup_fluxtream_credentials()"
    print ""
    print "replace xxx with your password, and execute that cell instead."
    print "Only do this if you're keeping this copy of your iPython notebook private,"
    print "and remove that cell before sharing"    
    
display(widgets.HTMLWidget(value='Fluxtream Username'))
fluxtream_username_widget = widgets.TextWidget()
display(fluxtream_username_widget)
display(widgets.HTMLWidget(value='Fluxtream Password'))
fluxtream_password_widget = widgets.TextWidget()
display(fluxtream_password_widget)

set_fluxtream_login_button = widgets.ButtonWidget(description='Set Fluxtream credentials')
set_fluxtream_login_button.on_click(set_fluxtream_password)
display(set_fluxtream_login_button)

# Enter Fluxtream username and password and click "Set Fluxtream credentials" button.  
# Password field will blank afterwards, but variables will be set


Verified credentials for user rsargent on fluxtream.org work. Guest ID=14
To make persistent for future restarts, insert a cell, paste in:

global fluxtream_username, fluxtream_password
fluxtream_username = "rsargent"
fluxtream_password = "xxx"
setup_fluxtream_credentials()

replace xxx with your password, and execute that cell instead.
Only do this if you're keeping this copy of your iPython notebook private,
and remove that cell before sharing

In [3]:
# Execute to list the devices and channel names available in this Fluxtream account
# Note that you will need to re-execute this cell after new data is uploaded 
# if you want to use up-to-date time bounds for a given channel
import pprint

global fluxtream_guest_id, guest_id, sources_list, dev_name_list

# Default to using the data of the currently logged in user.
# To use data for someone else, modify the line below to set
# guest_id to some other value.  See comments above 
# fluxtream_get_sources_list definition above for details.
guest_id = fluxtream_guest_id 

# Get the info about all the sources that this user has
# in their account
sources_list = fluxtream_get_sources_list(guest_id)
# Uncomment this if you want to see more details about the 
# structure of sources_list
#pprint.pprint(sources_list)

# Get the list of devices and channel names for this guest
dev_name_list = fluxtream_get_device_names(sources_list)
for dev_name in dev_name_list:
    channel_names = fluxtream_get_channel_names(dev_name, sources_list)
    print "Device '%s', %d channels:" % (dev_name, len(channel_names))
    print "  %s" % (channel_names)


Read of sources list for guest_id=14 succeeded
Device 'ACHD_Avalon', 18 channels:
  [u'BP_MM_HG', u'H2S_PPM', u'INT_T_DEGC', u'OUT_T_DEGC', u'PM25B_UG_M3', u'SIGTHETA_DEG', u'SO2_PPM', u'SONICWD_DEG', u'SONICWS_MPH', u'BP_MM_HG', u'H2S_PPM', u'INT_T_DEGC', u'OUT_T_DEGC', u'PM25B_UG_M3', u'SIGTHETA_DEG', u'SO2_PPM', u'SONICWD_DEG', u'SONICWS_MPH']
Device 'ACHD_Court_House', 3 channels:
  [u'CO_PPM', u'INT_T_DEGC', u'CO_PPM']
Device 'ACHD_Flag_Plaza', 7 channels:
  [u'CO_PPM', u'INT_T_DEGC', u'OUT_T_DEGC', u'PER_F_PERCENT', u'PM10_UG_M3', u'PER_F_PERCENT', u'PM10_UG_M3']
Device 'ACHD_Glassport_High_Street', 2 channels:
  [u'PER_F_PERCENT', u'PM10_UG_M3']
Device 'ACHD_Harrison_Township', 11 channels:
  [u'INT_T_DEGC', u'NO2_PPM', u'NOX_PPM', u'NO_PPM', u'OZONE2_PPM', u'OZONE_PPM', u'NO2_PPM', u'NOX_PPM', u'NO_PPM', u'OZONE2_PPM', u'OZONE_PPM']
Device 'ACHD_Lawrenceville', 26 channels:
  [u'BP_MM_HG', u'INT_T_DEGC', u'NO2_PPB', u'NOX_PPB', u'NO_PPB', u'OUT_RH_PERCENT', u'OUT_T_DEGC', u'OZONE2_PPM', u'OZONE_PPM', u'PER_F2_PERCENT', u'PM25_2__UG_M3', u'PM25_UG_M3', u'RWD_DEG', u'RWS_MPH', u'SIGTHETA_DEG', u'SONICWD_DEG', u'SONICWS_MPH', u'NO2_PPB', u'NOX_PPB', u'NO_PPB', u'OUT_RH_PERCENT', u'PER_F2_PERCENT', u'PM25_2__UG_M3', u'PM25_UG_M3', u'RWD_DEG', u'RWS_MPH']
Device 'ACHD_Lawrenceville_2', 13 channels:
  [u'CO_PPB', u'INT_T_DEGC', u'NOYDIF_PPB', u'NOY_PPB', u'NO_PPB', u'PM10B_UG_M3', u'PM25B_UG_M3', u'SO2_PPB', u'CO_PPB', u'NOYDIF_PPB', u'NOY_PPB', u'PM10B_UG_M3', u'SO2_PPB']
Device 'ACHD_Liberty', 7 channels:
  [u'H2S_PPM', u'INT_T_DEGC', u'OUT_T_DEGC', u'SIGTHETA_DEG', u'SO2_PPM', u'SONICWD_DEG', u'SONICWS_MPH']
Device 'ACHD_Liberty_2', 7 channels:
  [u'PM10_FL_PERCENT', u'PM10_UG_M3', u'PM25_2__UG_M3', u'PM25_FL_PERCENT', u'PM25_UG_M3', u'PM10_FL_PERCENT', u'PM25_FL_PERCENT']
Device 'ACHD_Lincoln', 4 channels:
  [u'PER_F2_PERCENT', u'PER_F_PERCENT', u'PM10_UG_M3', u'PM25_UG_M3']
Device 'ACHD_Monroeville', 4 channels:
  [u'BP_MM_HG', u'INT_T_DEGC', u'OUT_T_DEGC', u'PM10B_UG_M3']
Device 'ACHD_North_Braddock', 8 channels:
  [u'BP_MM_HG', u'INT_T_DEGC', u'OUT_T_DEGC', u'PM10B_UG_M3', u'SIGTHETA_DEG', u'SO2_PPM', u'SONICWD_DEG', u'SONICWS_MPH']
Device 'ACHD_South_Fayette', 8 channels:
  [u'INT_T_DEGC', u'OUT_T_DEGC', u'OZONE2_PPM', u'OZONE_PPM', u'SIGTHETA_DEG', u'SO2_PPM', u'SONICWD_DEG', u'SONICWS_MPH']
Device 'ACHD_West_Allegheny', 2 channels:
  [u'H2S_PPB', u'H2S_PPB']
Device 'Anne_PolarStrap', 5 channels:
  [u'BeatSpacing', u'HeartBeat', u'HeartRate', u'BeatSpacing', u'HeartRate']
Device 'BodyMedia', 14 channels:
  [u'activityType', u'caloriesBurned', u'efficiency', u'estimatedCalories', u'lying', u'mets', u'onBody', u'predictedCalories', u'sleeping', u'stepsGraph', u'totalCalories', u'totalLying', u'totalSleeping', u'totalSteps']
Device 'BodyMedia_old', 28 channels:
  [u'activityType', u'caloriesBurned', u'efficiency', u'estimatedCalories', u'lying', u'mets', u'onBody', u'predictedCalories', u'sleeping', u'stepsGraph', u'totalCalories', u'totalLying', u'totalSleeping', u'totalSteps', u'activityType', u'caloriesBurned', u'efficiency', u'estimatedCalories', u'lying', u'mets', u'onBody', u'predictedCalories', u'sleeping', u'stepsGraph', u'totalCalories', u'totalLying', u'totalSleeping', u'totalSteps']
Device 'Fitbit', 36 channels:
  [u'bmi', u'caloriesIntraday', u'caloriesOut', u'elevation', u'fairlyActiveMinutes', u'fat', u'floors', u'floorsIntraday', u'levelsIntraday', u'lightlyActiveDistance', u'lightlyActiveMinutes', u'loggedActivitiesDistance', u'metsIntraday', u'moderatelyActiveDistance', u'sedentaryActiveDistance', u'sedentaryMinutes', u'steps', u'stepsIntraday', u'totalDistance', u'trackerDistance', u'veryActiveDistance', u'veryActiveMinutes', u'weight', u'sleep', u'stepsIntraday', u'caloriesIntraday', u'levelsIntraday', u'metsIntraday', u'floorsIntraday', u'caloriesIn', u'water', u'caloriesInGoal', u'caloriesOutGoal', u'weight', u' fat', u' bmi']
Device 'FluxtreamCapture', 70 channels:
  [u'AccelX', u'AccelY', u'AccelZ', u'Altitude', u'AppResidentMemory', u'AppTotalCPUUsage', u'AppUserCPUUsage', u'AppVirtualMemory', u'BackgroundTimeRemaining', u'Course', u'Drift', u'HorizontalAccuracy', u'InBackground', u'Lag', u'Latitude', u'Longitude', u'MobileBatteryCharging', u'MobileBatteryLevel', u'RotW', u'RotX', u'RotY', u'RotZ', u'Speed', u'SystemActiveMemory', u'SystemFreeMemory', u'SystemWiredMemory', u'SystemwideActiveMemory', u'SystemwideFreeMemory', u'SystemwideTotalCPUUsage', u'SystemwideUserCPUUsage', u'SystemwideWiredMemory', u'TimeZoneName', u'TimeZoneOffset', u'VerticalAccuracy', u'photo', u'AccelX', u'AccelY', u'AccelZ', u'Altitude', u'AppResidentMemory', u'AppTotalCPUUsage', u'AppUserCPUUsage', u'AppVirtualMemory', u'BackgroundTimeRemaining', u'Course', u'Drift', u'HorizontalAccuracy', u'InBackground', u'Lag', u'Latitude', u'Longitude', u'MobileBatteryCharging', u'MobileBatteryLevel', u'RotW', u'RotX', u'RotY', u'RotZ', u'Speed', u'SystemActiveMemory', u'SystemFreeMemory', u'SystemWiredMemory', u'SystemwideActiveMemory', u'SystemwideFreeMemory', u'SystemwideTotalCPUUsage', u'SystemwideUserCPUUsage', u'SystemwideWiredMemory', u'TimeZoneName', u'TimeZoneOffset', u'VerticalAccuracy', u'photo']
Device 'Hexoskin', 11 channels:
  [u'AccelX', u'AccelY', u'AccelZ', u'BeatSpacing', u'HeartRate', u'RespAbdomen', u'RespChest', u'TidalVolume', u'RespAbdomen', u'RespChest', u'TidalVolume']
Device 'Netatmo_Basement', 6 channels:
  [u'H2O_ppm', u'RH', u'Temp_F', u'H2O_ppm', u'RH', u'Temp_F']
Device 'Netatmo_Bedroom', 3 channels:
  [u'H2O_ppm', u'RH', u'Temp_F']
Device 'Netatmo_CREATE_Lab', 3 channels:
  [u'H2O_ppm', u'RH', u'Temp_F']
Device 'Netatmo_CREATE_MR', 3 channels:
  [u'H2O_ppm', u'RH', u'Temp_F']
Device 'Netatmo_Office', 3 channels:
  [u'H2O_ppm', u'RH', u'Temp_F']
Device 'Netatmo_Porch', 3 channels:
  [u'H2O_ppm', u'RH', u'Temp_F']
Device 'Nonin3150', 8 channels:
  [u'Pulse', u'SpO2', u'SpO2_thresh_90', u'SpO2_thresh_93', u'Pulse', u'SpO2', u'SpO2_thresh_90', u'SpO2_thresh_93']
Device 'PolarStrap', 2 channels:
  [u'BeatSpacing', u'HeartRate']
Device 'Smells_1', 5 channels:
  [u'Any_notes', u'Rate_the_smell_1_5', u'Rate_the_smell_1_5', u'Any_notes', u'1_No_smell_2_Barely_noticeable_3_Definitely_noticeable_4_It_s_pretty_bad_5_About_as_bad_as_it_gets']
Device 'SnoreLab', 2 channels:
  [u'SnoreIntensity', u'SnoreIntensity']
Device 'Speck04191928011013000000', 6 channels:
  [u'humidity', u'particles', u'raw_particles', u'humidity', u'particles', u'raw_particles']
Device 'Speck42271924050513000000', 5 channels:
  [u'humidity', u'particles', u'raw_particles', u'temperature', u'temperature']
Device 'Withings', 8 channels:
  [u'cuffHeartRate', u'diastolic', u'fatFreeMass', u'fatMassWeight', u'fatRatio', u'scaleHeartRate', u'systolic', u'weight']
Device 'calibration_speck', 4 channels:
  [u'humidity', u'particles', u'raw_particles', u'temperature']
Device 'junk', 40 channels:
  [u'BP', u'CO', u'H2S', u'INT_T', u'NO', u'NO2', u'NOX', u'OUT_RH', u'OUT_T', u'OZONE', u'OZONE2', u'PER_F', u'PER_F2', u'PM10', u'PM25', u'PM25B', u'SIGTHETA', u'SO2', u'SONICWD', u'SONICWS', u'BP', u'CO', u'H2S', u'INT_T', u'NO', u'NO2', u'NOX', u'OUT_RH', u'OUT_T', u'OZONE', u'OZONE2', u'PER_F', u'PER_F2', u'PM10', u'PM25', u'PM25B', u'SIGTHETA', u'SO2', u'SONICWD', u'SONICWS']
Device 'stoop_speck_130409', 4 channels:
  [u'humidity', u'particles', u'raw_particles', u'temperature']
Device 'Last_FM', 1 channels:
  [u'tracks']
Device 'Calendar', 1 channels:
  [u'entries']
Device 'All', 1 channels:
  [u'photo']

In [4]:
# Modify the values below for setting up which source 
# channels you want to process, and where to put the resulting computed values.  

# The naming scheme is <dev_name>.<channel_name> to specify a given channel of a given device.
# Change the keys of channel_proc_map to the channel names you want to use for input.  
# Change the values in channel_proc_map to the channel name you want to use for output
#   of the values computed for a given input channel.
# Execute to setup module_info_map based on those settings.

# The output of the cell above shows what the station and modules names are for the
# Netatmo account you've bound the access_token to.

global guest_id, sources_list, dev_name_list, channel_proc_map, channel_info_map

channel_proc_map = {'Nonin3150.SpO2': 'Nonin3150.SpO2_thresh_93', 
                    'Nonin3150.SpO2': 'Nonin3150.SpO2_thresh_90'}

In [5]:
# Modify this cell to change the function computed for a given source channel.
# This one computes how much an SpO2 reading drops below a threshold.  

# The input args are the epoch timestamp, the source value, the source channel name, 
# and the destination channel name.  In this case, we don't care about the timestamp 
# and parse the threshold to use from the channel name.

# Note that the type of source_value may be a string, since some sources of input, 
# such as CSV resaders, are unable to distinguish the type of a a value and always return 
# strings.  If you need to treat the value as a numeric type, you'll 
# need to convert it yourself
def compute_channel_fn(timestamp, source_value, source_ch_name, dest_ch_name):
    thresh = 93.0
    
    dest_ch_elts = dest_ch_name.split('.')[1].split('_')
    if len(dest_ch_elts) == 3 and dest_ch_elts[0] == "SpO2" and dest_ch_elts[1] == "thresh":
        thresh = float(dest_ch_elts[2])
    
    #print "Thresh = %d (dest channel = %s)" % (thresh, dest_ch_name)
        
    source_f = float(source_value)
    if(source_f>thresh):
        return None
    else:
        dest_f = thresh-source_f
        #print "Source = %f, dest = %f" % (source_f, dest_f)
        return dest_f

In [6]:
# Modify this cell to choose whether to just run compute_channel_fn for 
# new data (timestamps from the source_ch that are > the max timestamp 
# in dest_ch) (recompute_all=False), or force a recompute everything
# (recompute_all=True).

# If compute_channel_fn is the same as it's been and you just want to run it 
# on new data that's come in, do the former (recompute_all=False).

# If you've changed compute_channel_fn and want to run it on everything, 
# do the latter (recompute_all=True)
recompute_all=False
#recompute_all=True

In [8]:
# Execute this cell to read in the selected range of data from all the 
# source channels based on recompute_all, run compute_channel_fn on each 
# data point, and upload the result to the dest channel

global guest_id, sources_list, dev_name_list, channel_proc_map, channel_info_map, csv_reader_map

# Update sources_list in case there's new data available on any 
# of the relevant channels (current min_time and max_time for each channel are
# part of what gets returned by the Fluxtream get sources API call)
sources_list = fluxtream_get_sources_list(guest_id)

# Parse populate channel_info_map with keys for each source/dest channel name
# and values with the info on that channel as provided by sources_list
channel_info_map = {}
for dev_ch_name in channel_proc_map.keys() + channel_proc_map.values():
    # Split the device and module names
    dev_ch_name_elts = dev_ch_name.split('.')
    # Get the channel_info object for this
    channel_info = fluxtream_get_channel_info(dev_ch_name_elts[0], dev_ch_name_elts[1], sources_list)
    if channel_info == None:
        # This is ok for a destination channel, but not a source channel
        if dev_ch_name in channel_proc_map.keys():
            raise Exception("Can't find channel info for source channel %s; recheck device and channel names list" % dev_ch_name)
        else:
            print "Can't find channel info for dest channel %s; will create it" % dev_ch_name
            
    # Store the channel info in channel_info_map
    channel_info_map[dev_ch_name]=channel_info
    
print ""
print "Successfully setup channel_info_map"
print ""

# Next, iterate over the source channels.  For each, compute time ranges to process.
# If the dest channel doesn't exist, use the full time range of the source channel.
# If the dest channel does exist, by default just compute the data points that are 
# later than the last timestamp in the dest channel.  

# Read the CSV files for all the source channels.
# The keys of csv_reader_map are the source channels.
# The values are csv_reader objects.  Each call to 
# csv_reader.next() will return a row consisting of 
#   t, source_ch[t]
# Where t is the epoch timestamp (aka unixtime) for the sensor reading in that row, 
# and source_ch[t] is the value of the source sensor channel at time t

csv_reader_map = {}
for dev_ch_name in channel_proc_map.keys():
    source_ch = dev_ch_name
    source_info = channel_info_map[source_ch]
    dest_ch = channel_proc_map[dev_ch_name]
    dest_info = channel_info_map[dest_ch]
    
    # End time is always the max_time for the source channel
    end_time = source_info['max_time']
    # Start time is min_time for the source channel if the
    # the dest_ch doesn't exist yet or if recompute_all is set.
    # Otherwise it is max_time of the dest channel so only new
    # values are computed
    start_time = source_info['min_time']
    if recompute_all==False and dest_info != None:
        start_time = dest_info['max_time']
        print "Processing only new data in %s that isn't in %s: %f to %f" %(source_ch, dest_ch, start_time, end_time)
    else:
        print "Processing all data for %s: %f to %f" % (source_ch, start_time, end_time)
        
    if end_time <= start_time:
        print "  No time range to process for guest %d, channel %s" % (guest_id, dev_ch_name)
    else:
        # Get a CSV for the source channel for the desired time range
        csv_reader_map[source_ch] = fluxtream_get_csv(guest_id, [source_ch], start_time, end_time)
        print "Successfully read data for guest %d, channel %s: %f to %f" % (guest_id, source_ch, start_time, end_time)

print ""
print "Done: Read CSV data for %d source channels: %s" % (len(csv_reader_map.keys()), csv_reader_map.keys())
print ""

# Now process the data from all the source channels and upload 
# the results to the corresponding destination channels.

# The csv_reader objects the previous cell created are used in this cell to 
# compute and upload data.  

# Note that this you need to execute the previous cell each time before executing this one
# because the process of iterating over a given csv_reader object 
# In the loop below consumes the entries and they're not available to a subsequent run of the 
# loop


# Define a function for doing a partial upload of data to a given dest_ch
def partial_upload(dest_ch, upload_data):
    if len(upload_data)>0:
        # For upload, we need to split the device and channel name 
        # pieces of dest_ch apart, and put the channel name part in 
        # an array
        dest_dev_nickname, dest_ch_name = dest_ch.split('.')
        # print "Uploading %d data points to dest %s" % (len(upload_data), dest_ch)
        fluxtream_upload(dest_dev_nickname, [dest_ch_name], upload_data)
    else:
         # No data
         print "No data to upload for dest %s" % (dest_ch)
            
# For each csv_reader returned, call 
#   compute_channel_fn(timestamp, source_value, source_ch_name, dest_ch_name)
# create an output data array for upload, and upload it to the Fluxtream server
# and account set up in the credentials entry section above
for source_ch in csv_reader_map.keys():
    # Get the name of the output channel from channel_proc_map
    dest_ch = channel_proc_map[source_ch]

    print "Processing %s -> %s" % (source_ch, dest_ch)

    # Retrieve the csv_reader object for this source channel from csv_reader_map,
    # which was set up in the previous loop
    csv_reader = csv_reader_map[source_ch]
    
    # Iterate over the lines in the CSV file for the source channel.
    # Call compute_channel_fn on each, and add each line that returns 
    # non-null to upload_data for the given timestamp
    
    # We may need to do this in multiple batches if there are too many rows for 
    # a reasonable upload.

    # Setup the upload data array
    upload_data=[]

    for row in csv_reader:
        # Make sure the row has two entries: timestamp and source value
        # and read them into local variables
        assert(len(row)==2)        
        timestamp = float(row[0])
        source_value = row[1]
        comp_val = compute_channel_fn(timestamp, source_value, source_ch, dest_ch)

        if comp_val != None:
            #print "%d (%f): %s -> %d" % (csv_reader.line_num, timestamp, source_value, comp_val)
            upload_data.append([timestamp,comp_val])
            # Check if upload_data is big enough we should upload now,
            # and if so clear upload_data afterwards
            if(len(upload_data)>=1000):
                partial_upload(dest_ch, upload_data)
                upload_data = []
                    
    # Upload any remaining rows in upload_data
    partial_upload(dest_ch, upload_data)
    
print ""
print "Done: Uploaded computed data for %d source channels: %s" % (len(csv_reader_map.keys()), csv_reader_map.keys())


Read of sources list for guest_id=14 succeeded

Successfully setup channel_info_map

Processing only new data in Nonin3150.SpO2 that isn't in Nonin3150.SpO2_thresh_90: 1427357063.000000 to 1427799406.000000
Successfully read data for guest 14, channel Nonin3150.SpO2: 1427357063.000000 to 1427799406.000000

Done: Read CSV data for 1 source channels: ['Nonin3150.SpO2']

Processing Nonin3150.SpO2 -> Nonin3150.SpO2_thresh_90
Uploading 1000 data points to rsargent's account on server fluxtream.org, device Nonin3150, channels ['SpO2_thresh_90']
Upload to Nonin3150 ['SpO2_thresh_90'] (1000 rows, 1427357063 to 1427779000) succeeded
Uploading 1000 data points to rsargent's account on server fluxtream.org, device Nonin3150, channels ['SpO2_thresh_90']
Upload to Nonin3150 ['SpO2_thresh_90'] (1000 rows, 1427779001 to 1427794536) succeeded
Uploading 1000 data points to rsargent's account on server fluxtream.org, device Nonin3150, channels ['SpO2_thresh_90']
Upload to Nonin3150 ['SpO2_thresh_90'] (1000 rows, 1427794537 to 1427797390) succeeded
Uploading 244 data points to rsargent's account on server fluxtream.org, device Nonin3150, channels ['SpO2_thresh_90']
Upload to Nonin3150 ['SpO2_thresh_90'] (244 rows, 1427797391 to 1427799365) succeeded

Done: Uploaded computed data for 1 source channels: ['Nonin3150.SpO2']

In [ ]: