node_red_dsx_workflow


Derive insights on Olympics data using Python Pandas

Expose an integration point using websockets for orchestration with Node-RED.

1. Setup

To prepare your environment, you need to install some packages.

1.1 Install the necessary packages

You need the latest versions of these packages:

  • websocket-client: is a python client for the Websockets.
  • python-swiftclient: is a python client for the Swift API.

Install the websocket client:


In [ ]:
!pip install websocket-client

Install IBM Bluemix Object Storage Client:


In [ ]:
!pip install python-swiftclient

1.2 Import packages and libraries

Import the packages and libraries that you'll use:


In [ ]:
import pandas as pd
import matplotlib.pyplot as plt
import json
import websocket
import thread
import time
import swiftclient
import codecs
from io import StringIO

2. Configuration

Add configurable items of the notebook below

2.1 Add your service credentials for Object Storage

You must create Object Storage service on Bluemix. To access data in a file in Object Storage, you need the Object Storage authentication credentials. Insert the Object Storage authentication credentials as credentials_1 in the following cell after removing the current contents in the cell.


In [ ]:

2.3 Global Variables

Add global variables.


In [ ]:
olympics_data_filename = 'olympics.csv'
dictionary_data_filename = 'dictionary.csv'

3. Persistence and Storage

3.1 Configure Object Storage Client


In [ ]:
auth_url = credentials_1['auth_url']+"/v3"
container = credentials_1["container"]

IBM_Objectstorage_Connection = swiftclient.Connection(
    key=credentials_1['password'], authurl=auth_url, auth_version='3', os_options={
        "project_id": credentials_1['project_id'], "user_id": credentials_1['user_id'], "region_name": credentials_1['region']})

def create_container(container_name):
    """ Create a container on Object Storage.
    """
    x = IBM_Objectstorage_Connection.put_container(container_name)
    return x

def put_object(container_name, fname, contents, content_type):
    """ Write contents to Object Storage.
    """
    x = IBM_Objectstorage_Connection.put_object(
        container_name,
        fname,
        contents,
        content_type)
    return x

def get_object(container_name, fname):
    """ Retrieve contents from Object Storage.
    """
    Object_Store_file_details = IBM_Objectstorage_Connection.get_object(
        container_name, fname)
    return Object_Store_file_details[1]

4. Data

4.1 Prepare data

Combine the olympics and dictionary data into a single dataframe:

  • Read olympics data from Object Storage.
  • Rename columns
  • Populate the data in the dictionary to the Olympics data with a merge


In [ ]:
olympics = pd.read_csv(StringIO(get_object(container, olympics_data_filename).decode('utf-8')))
olympics = olympics.rename(columns = {'Country':'Code'})
olympics = olympics.rename(columns = {'Year':'Edition'})
dictionary = pd.read_csv(StringIO(get_object(container, dictionary_data_filename).decode('utf-8')))
olympics = pd.merge(olympics, dictionary, on='Code')
olympics.head()

5. Insights on the data using Python Pandas

  • Create re-usable functions

In [ ]:
def get_medals_gb_year_country():
    """ Group by edition and country and sum medals count.
    """
    medals_groupedBy_yearCountry = olympics.groupby(['Edition','Code']).apply(lambda country: country['Code'].count())
    return medals_groupedBy_yearCountry

def get_medals_gb_year_country_medal():
    """ Group by edition, country, medal type and sum medals count.
    """
    medals_groupedBy_yearCountryMedal = olympics.groupby(['Edition', 'Code', 'Medal']).apply(lambda country: country['Medal'].count())
    return medals_groupedBy_yearCountryMedal

def get_medals_last_10_years(countrycode):
    """ Get Gold, Silver and Bronze medals for a country for last 10 editions.
    """
    last10pics = olympics['Edition'].unique()
    yrs = pd.Series(last10pics).nlargest(10)
    df = pd.DataFrame([], columns=['Year', 'Gold', 'Silver', 'Bronze'])
    medalsdf = get_medals_gb_year_country_medal()
   
    for yr in yrs:
        medaltally = medalsdf[yr][countrycode]
        gold = 0
        silver = 0
        bronze = 0
        if 'Gold' in medaltally:
            gold = medaltally['Gold']
        if 'Silver' in medaltally:
            silver = medaltally['Silver']
        if 'Bronze' in medaltally:
            bronze =  medaltally['Bronze']
        df1 = pd.DataFrame([[yr,gold, silver, bronze]], columns=['Year', 'Gold', 'Silver', 'Bronze'])
        df = df.append(df1, ignore_index=True) 
    df = df.sort_values(by=['Year'], ascending=True)    
    df = df.reset_index()
    del df['index']
    return df

def get_correlation_medalstally():
    """ Get correlation between the medals tally and population, GDP per capita.
    """
    df = get_medals_gb_year_country()
    values  = get_all_olympic_years().values
    size = values.size
    correlations = []
    for i in range(size):
        year = values[i][0]
        df1 = df[year].to_frame(name="Tally")
        df1 = df1.reset_index()
        df2 = pd.merge(df1,dictionary, on='Code')
        corrpop = df2.corr().values[0][1]
        corrgdp = df2.corr().values[0][2]
        resp = {"Year": year, "Population":corrpop, "GDP":corrgdp}
        correlations.append(resp)
    return correlations  

def get_medals_category(countrycode, year):
    """ Get the medals count in different sports category for a country in an edition.
    """
    df = olympics[olympics['Edition'] ==  year]
    df1 = df[df['Code'] == countrycode]
    df2 = df1.groupby(['Sport']).apply(lambda country: country['Medal'].count())
    return df2

def get_medals_category_all(countrycode):  
    """ Get the medals count in different sports category for a country for last ten editions.
    """
    df1 = olympics[olympics['Code'] == countrycode]
    df2 = df1.groupby(['Sport']).apply(lambda country: country['Medal'].count())
    return df2

def get_top_ten_gold_tally(year):
    """ Get the top ten gold medal winning countries in an edition.
    """
    df = olympics[olympics['Edition'] ==  year]
    df1 = df[df['Medal'] == 'Gold']
    df2 = df1.groupby(['Code']).apply(lambda country: country['Medal'].count())
    return df2

def get_top_ten_total_tally(year):
    """ Get the top ten total medal winning countries in an edition.
    """
    df = olympics[olympics['Edition'] ==  year]
    df1 = df.groupby(['Code']).apply(lambda country: country['Medal'].count())
    return df1

def get_year_venue():
    """ Get edition venue matrix.
    """
    df = olympics[['Edition', 'City']]
    df = df.drop_duplicates()
    df = df.reset_index()
    df = df.set_index('Edition')
    del df['index']
    return df.sort_index()

def get_all_olympic_years():
    """ Get list of all olympic editions.
    """
    df = olympics['Edition']
    df = df.drop_duplicates()
    df = df.reset_index()
    del df['index']
    return df.sort_index()

def get_all_countries():
    """ Get list of all countries.
    """
    df = olympics[['Code','Country']]
    df = df.drop_duplicates()
    df = df.reset_index()
    del df['index']
    return df.sort(['Country'],ascending=[True])

def get_country_edition_data(countrycode,edition):
    """ Get data for a country and edition.
    """
    df = olympics[olympics["Code"] == countrycode]
    df1 = df[df["Edition"] == edition]
    return df1

6. Expose integration point with a websocket client


In [ ]:
def on_message(ws, message):
    print(message)
    msg = json.loads(message)
    cmd = msg['cmd']
    
    if cmd == 'MBY':
        country = msg['country']
        tally = get_medals_last_10_years(country)    
        tallyarray=[]
        for i, row in tally.iterrows():
            medaltally = {"Year":int(row["Year"]),
                          "Gold":int(row["Gold"]),
                          "Silver":int(row["Silver"]),
                          "Bronze":int(row["Bronze"])}
            tallyarray.append(medaltally)
        wsresponse = {}
        wsresponse["forcmd"] = "MBY" 
        wsresponse["response"] = tallyarray
        ws.send(json.dumps(wsresponse))
    elif cmd == 'MBSC':
        country = msg['country']
        year = 2008
        response = get_medals_category(country, year)
        
        ct = response.count()
        if ct > 5:
            response = response.nlargest(5)    
        
        medals = []
        categories = []
        for i, row in response.iteritems():
            categories.append(i)
            medals.append(row)   
  
        wsresponse = {}
        wsresponse["forcmd"] = "MBSC"
        wsresponse["response"] = { "categories":categories, "medals":medals}         
        ws.send(json.dumps(wsresponse))
    elif cmd == 'MBSA':
        country = msg['country']
        response = get_medals_category_all(country)
        
        ct = response.count()
        if ct > 5:
            response = response.nlargest(5)    
        
        medals = []
        categories = []
        for i, row in response.iteritems():
            categories.append(i)
            medals.append(row)   
  
        wsresponse = {}
        wsresponse["forcmd"] = "MBSA"
        wsresponse["response"] = { "categories":categories, "medals":medals}         
        ws.send(json.dumps(wsresponse))    
    elif cmd == 'T10G':
        edition = msg["edition"]
        response = get_top_ten_gold_tally(edition)
        ct = response.count()
        if ct > 10:
            response = response.nlargest(10)
        medals = []
        for i, row in response.iteritems():
            data = {"country":i,"tally":row}
            medals.append(data)  
        wsresponse = {}
        wsresponse["forcmd"] = "T10G"
        wsresponse["response"] = medals   
        print(wsresponse)
        ws.send(json.dumps(wsresponse))     
    elif cmd == 'T10M':
        year = msg["edition"]
        response = get_top_ten_total_tally(year)
        ct = response.count()
        if ct > 10:
            response = response.nlargest(10)
        medals = []
        for i, row in response.iteritems():
            data = {"country":i,"tally":row}
            medals.append(data)  
        wsresponse = {}
        wsresponse["forcmd"] = "T10M"
        wsresponse["response"] = medals   
        print(wsresponse)
        ws.send(json.dumps(wsresponse)) 
    elif cmd == 'CORR':
        corr = get_correlation_medalstally() 
        wsresponse = {}
        wsresponse["forcmd"] = "CORR"
        wsresponse["response"] = corr
        ws.send(json.dumps(wsresponse)) 
    elif cmd == 'YV':   
        yearvenue = get_year_venue()
        yearvenuearray = []
        for i in range(yearvenue.size):
            value = {"Year":yearvenue.index[i],"Venue":yearvenue.values[i].tolist()[0]}
            yearvenuearray.append(value)
        responsejson = {}
        responsejson["forcmd"]="YV"
        responsejson["response"]=yearvenuearray
        ws.send(json.dumps(responsejson))               
    elif cmd == 'DATA':
        country = msg['country']
        edition = msg['edition']
        olympicsslice = get_country_edition_data(country,edition)
        data = []
        numofcolumns = olympicsslice.columns.size
        cols = []
        values = []
        for column in olympicsslice.columns:
            cols.append(column)
        for value in olympicsslice.values:
            values.append(value.tolist()) 
        data = {"cols":cols,"vals":values}    
        responsejson = {}
        responsejson['forcmd']='DATA'
        responsejson['response']= data
        ws.send(json.dumps(responsejson)) 
    elif cmd == 'EDITIONS':
        years = get_all_olympic_years()
        yearsarray = []
        for i,row in years.iteritems():
            for value in row:
                yearsarray.append(value)
        length = len(yearsarray)
        wsresponse = []
        for i in range(length):
            year = {"text":yearsarray[i],"value":yearsarray[i]}
            wsresponse.append(year)
        responsejson = {}
        responsejson['forcmd']='EDITIONS'
        responsejson['response']= wsresponse 
        ws.send(json.dumps(responsejson)) 
    elif cmd == 'COUNTRIES':
        countries = get_all_countries()
        countriesarray = []
        codearray = []
        for i,row in countries.iteritems():
            if i=='Code':
                for value in row:
                    codearray.append(value)
            elif i=='Country':  
                for value in row:
                    countriesarray.append(value)
        length = len(codearray)
        wsresponse = []
        for i in range(length):
            country = {"text":countriesarray[i],"value":codearray[i]}
            wsresponse.append(country)
        responsejson = {}
        responsejson['forcmd']='COUNTRIES'
        responsejson['response']= wsresponse 
        ws.send(json.dumps(responsejson))  

def on_error(ws, error):
    print(error)

def on_close(ws):
    ws.send("DSX Listen End")

def on_open(ws):
    def run(*args):
        for i in range(10000):
            hbeat = '{"cmd":"Olympics DSX HeartBeat"}'
            ws.send(hbeat)
            time.sleep(100)
            
    thread.start_new_thread(run, ())


def start_websocket_listener():
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("ws://NODERED_BASE_URL/ws/orchestrate",
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()

7. Start websocket client


In [ ]:
start_websocket_listener()

In [ ]: