In [ ]:
!pip install websocket-client
Install IBM Bluemix Object Storage Client:
In [ ]:
!pip install python-swiftclient
In [ ]:
import pandas as pd
import matplotlib.pyplot as plt
import json
import websocket
import thread
import time
import swiftclient
import codecs
from io import StringIO
You must create Object Storage service on Bluemix. To access data in a file in Object Storage, you need the Object Storage authentication credentials. Insert the Object Storage authentication credentials as credentials_1 in the following cell after removing the current contents in the cell.
In [ ]:
In [ ]:
olympics_data_filename = 'olympics.csv'
dictionary_data_filename = 'dictionary.csv'
In [ ]:
auth_url = credentials_1['auth_url']+"/v3"
container = credentials_1["container"]
IBM_Objectstorage_Connection = swiftclient.Connection(
key=credentials_1['password'], authurl=auth_url, auth_version='3', os_options={
"project_id": credentials_1['project_id'], "user_id": credentials_1['user_id'], "region_name": credentials_1['region']})
def create_container(container_name):
""" Create a container on Object Storage.
"""
x = IBM_Objectstorage_Connection.put_container(container_name)
return x
def put_object(container_name, fname, contents, content_type):
""" Write contents to Object Storage.
"""
x = IBM_Objectstorage_Connection.put_object(
container_name,
fname,
contents,
content_type)
return x
def get_object(container_name, fname):
""" Retrieve contents from Object Storage.
"""
Object_Store_file_details = IBM_Objectstorage_Connection.get_object(
container_name, fname)
return Object_Store_file_details[1]
In [ ]:
olympics = pd.read_csv(StringIO(get_object(container, olympics_data_filename).decode('utf-8')))
olympics = olympics.rename(columns = {'Country':'Code'})
olympics = olympics.rename(columns = {'Year':'Edition'})
dictionary = pd.read_csv(StringIO(get_object(container, dictionary_data_filename).decode('utf-8')))
olympics = pd.merge(olympics, dictionary, on='Code')
olympics.head()
In [ ]:
def get_medals_gb_year_country():
""" Group by edition and country and sum medals count.
"""
medals_groupedBy_yearCountry = olympics.groupby(['Edition','Code']).apply(lambda country: country['Code'].count())
return medals_groupedBy_yearCountry
def get_medals_gb_year_country_medal():
""" Group by edition, country, medal type and sum medals count.
"""
medals_groupedBy_yearCountryMedal = olympics.groupby(['Edition', 'Code', 'Medal']).apply(lambda country: country['Medal'].count())
return medals_groupedBy_yearCountryMedal
def get_medals_last_10_years(countrycode):
""" Get Gold, Silver and Bronze medals for a country for last 10 editions.
"""
last10pics = olympics['Edition'].unique()
yrs = pd.Series(last10pics).nlargest(10)
df = pd.DataFrame([], columns=['Year', 'Gold', 'Silver', 'Bronze'])
medalsdf = get_medals_gb_year_country_medal()
for yr in yrs:
medaltally = medalsdf[yr][countrycode]
gold = 0
silver = 0
bronze = 0
if 'Gold' in medaltally:
gold = medaltally['Gold']
if 'Silver' in medaltally:
silver = medaltally['Silver']
if 'Bronze' in medaltally:
bronze = medaltally['Bronze']
df1 = pd.DataFrame([[yr,gold, silver, bronze]], columns=['Year', 'Gold', 'Silver', 'Bronze'])
df = df.append(df1, ignore_index=True)
df = df.sort_values(by=['Year'], ascending=True)
df = df.reset_index()
del df['index']
return df
def get_correlation_medalstally():
""" Get correlation between the medals tally and population, GDP per capita.
"""
df = get_medals_gb_year_country()
values = get_all_olympic_years().values
size = values.size
correlations = []
for i in range(size):
year = values[i][0]
df1 = df[year].to_frame(name="Tally")
df1 = df1.reset_index()
df2 = pd.merge(df1,dictionary, on='Code')
corrpop = df2.corr().values[0][1]
corrgdp = df2.corr().values[0][2]
resp = {"Year": year, "Population":corrpop, "GDP":corrgdp}
correlations.append(resp)
return correlations
def get_medals_category(countrycode, year):
""" Get the medals count in different sports category for a country in an edition.
"""
df = olympics[olympics['Edition'] == year]
df1 = df[df['Code'] == countrycode]
df2 = df1.groupby(['Sport']).apply(lambda country: country['Medal'].count())
return df2
def get_medals_category_all(countrycode):
""" Get the medals count in different sports category for a country for last ten editions.
"""
df1 = olympics[olympics['Code'] == countrycode]
df2 = df1.groupby(['Sport']).apply(lambda country: country['Medal'].count())
return df2
def get_top_ten_gold_tally(year):
""" Get the top ten gold medal winning countries in an edition.
"""
df = olympics[olympics['Edition'] == year]
df1 = df[df['Medal'] == 'Gold']
df2 = df1.groupby(['Code']).apply(lambda country: country['Medal'].count())
return df2
def get_top_ten_total_tally(year):
""" Get the top ten total medal winning countries in an edition.
"""
df = olympics[olympics['Edition'] == year]
df1 = df.groupby(['Code']).apply(lambda country: country['Medal'].count())
return df1
def get_year_venue():
""" Get edition venue matrix.
"""
df = olympics[['Edition', 'City']]
df = df.drop_duplicates()
df = df.reset_index()
df = df.set_index('Edition')
del df['index']
return df.sort_index()
def get_all_olympic_years():
""" Get list of all olympic editions.
"""
df = olympics['Edition']
df = df.drop_duplicates()
df = df.reset_index()
del df['index']
return df.sort_index()
def get_all_countries():
""" Get list of all countries.
"""
df = olympics[['Code','Country']]
df = df.drop_duplicates()
df = df.reset_index()
del df['index']
return df.sort(['Country'],ascending=[True])
def get_country_edition_data(countrycode,edition):
""" Get data for a country and edition.
"""
df = olympics[olympics["Code"] == countrycode]
df1 = df[df["Edition"] == edition]
return df1
In [ ]:
def on_message(ws, message):
print(message)
msg = json.loads(message)
cmd = msg['cmd']
if cmd == 'MBY':
country = msg['country']
tally = get_medals_last_10_years(country)
tallyarray=[]
for i, row in tally.iterrows():
medaltally = {"Year":int(row["Year"]),
"Gold":int(row["Gold"]),
"Silver":int(row["Silver"]),
"Bronze":int(row["Bronze"])}
tallyarray.append(medaltally)
wsresponse = {}
wsresponse["forcmd"] = "MBY"
wsresponse["response"] = tallyarray
ws.send(json.dumps(wsresponse))
elif cmd == 'MBSC':
country = msg['country']
year = 2008
response = get_medals_category(country, year)
ct = response.count()
if ct > 5:
response = response.nlargest(5)
medals = []
categories = []
for i, row in response.iteritems():
categories.append(i)
medals.append(row)
wsresponse = {}
wsresponse["forcmd"] = "MBSC"
wsresponse["response"] = { "categories":categories, "medals":medals}
ws.send(json.dumps(wsresponse))
elif cmd == 'MBSA':
country = msg['country']
response = get_medals_category_all(country)
ct = response.count()
if ct > 5:
response = response.nlargest(5)
medals = []
categories = []
for i, row in response.iteritems():
categories.append(i)
medals.append(row)
wsresponse = {}
wsresponse["forcmd"] = "MBSA"
wsresponse["response"] = { "categories":categories, "medals":medals}
ws.send(json.dumps(wsresponse))
elif cmd == 'T10G':
edition = msg["edition"]
response = get_top_ten_gold_tally(edition)
ct = response.count()
if ct > 10:
response = response.nlargest(10)
medals = []
for i, row in response.iteritems():
data = {"country":i,"tally":row}
medals.append(data)
wsresponse = {}
wsresponse["forcmd"] = "T10G"
wsresponse["response"] = medals
print(wsresponse)
ws.send(json.dumps(wsresponse))
elif cmd == 'T10M':
year = msg["edition"]
response = get_top_ten_total_tally(year)
ct = response.count()
if ct > 10:
response = response.nlargest(10)
medals = []
for i, row in response.iteritems():
data = {"country":i,"tally":row}
medals.append(data)
wsresponse = {}
wsresponse["forcmd"] = "T10M"
wsresponse["response"] = medals
print(wsresponse)
ws.send(json.dumps(wsresponse))
elif cmd == 'CORR':
corr = get_correlation_medalstally()
wsresponse = {}
wsresponse["forcmd"] = "CORR"
wsresponse["response"] = corr
ws.send(json.dumps(wsresponse))
elif cmd == 'YV':
yearvenue = get_year_venue()
yearvenuearray = []
for i in range(yearvenue.size):
value = {"Year":yearvenue.index[i],"Venue":yearvenue.values[i].tolist()[0]}
yearvenuearray.append(value)
responsejson = {}
responsejson["forcmd"]="YV"
responsejson["response"]=yearvenuearray
ws.send(json.dumps(responsejson))
elif cmd == 'DATA':
country = msg['country']
edition = msg['edition']
olympicsslice = get_country_edition_data(country,edition)
data = []
numofcolumns = olympicsslice.columns.size
cols = []
values = []
for column in olympicsslice.columns:
cols.append(column)
for value in olympicsslice.values:
values.append(value.tolist())
data = {"cols":cols,"vals":values}
responsejson = {}
responsejson['forcmd']='DATA'
responsejson['response']= data
ws.send(json.dumps(responsejson))
elif cmd == 'EDITIONS':
years = get_all_olympic_years()
yearsarray = []
for i,row in years.iteritems():
for value in row:
yearsarray.append(value)
length = len(yearsarray)
wsresponse = []
for i in range(length):
year = {"text":yearsarray[i],"value":yearsarray[i]}
wsresponse.append(year)
responsejson = {}
responsejson['forcmd']='EDITIONS'
responsejson['response']= wsresponse
ws.send(json.dumps(responsejson))
elif cmd == 'COUNTRIES':
countries = get_all_countries()
countriesarray = []
codearray = []
for i,row in countries.iteritems():
if i=='Code':
for value in row:
codearray.append(value)
elif i=='Country':
for value in row:
countriesarray.append(value)
length = len(codearray)
wsresponse = []
for i in range(length):
country = {"text":countriesarray[i],"value":codearray[i]}
wsresponse.append(country)
responsejson = {}
responsejson['forcmd']='COUNTRIES'
responsejson['response']= wsresponse
ws.send(json.dumps(responsejson))
def on_error(ws, error):
print(error)
def on_close(ws):
ws.send("DSX Listen End")
def on_open(ws):
def run(*args):
for i in range(10000):
hbeat = '{"cmd":"Olympics DSX HeartBeat"}'
ws.send(hbeat)
time.sleep(100)
thread.start_new_thread(run, ())
def start_websocket_listener():
websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://NODERED_BASE_URL/ws/orchestrate",
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()
In [ ]:
start_websocket_listener()
In [ ]: