In [1]:
sqlContext=SQLContext(sc)
Place the curser in the next cell and add the credentials object. This will have the Cloudant NoSQL DB connectivity details
In [2]:
# @hidden_cell
credentials_1 = {
'username':'e0436527-e028-46f3-9090-a4fce60bffc5-bluemix',
'password':"""5a2715321a33c129ef05d63ef26099262a7e0d2d519bd00cee1db3994f0d531e""",
'host':'e0436527-e028-46f3-9090-a4fce60bffc5-bluemix.cloudant.com',
'port':'443',
'url':'https://e0436527-e028-46f3-9090-a4fce60bffc5-bluemix:5a2715321a33c129ef05d63ef26099262a7e0d2d519bd00cee1db3994f0d531e@e0436527-e028-46f3-9090-a4fce60bffc5-bluemix.cloudant.com'
}
Load the data from Cloudant NoSQL DB into the Object Store. Replace the DB name to match your's
In [46]:
cloudantdata=sqlContext.read.format("com.cloudant.spark").\
option("cloudant.host",credentials_1['host']).\
option("cloudant.username", credentials_1['username']).\
option("cloudant.password", credentials_1['password']).\
load("iotp_9ts1nd_default_2016-12")
Select only the required columns. For example, select only the timestamp and temperature
In [47]:
df = cloudantdata.selectExpr("data.ts as timestamp", "data.d.temperature as temperature")
df.take(1)
Out[47]:
Convert the Dataframe into Pandas dataframe. The Python Data Analysis Library (a.k.a. pandas) provides high-performance, easy-to-use data structures and data analysis tools that are designed to make working with relational or labeled data both easy and intuitive. It aims to be the fundamental high-level building block for doing practical, real world data analysis in Python.
In [48]:
import pprint
import pandas as pd
pandaDF = df.toPandas()
#Fill NA/NaN values to 0
pandaDF.fillna(0, inplace=True)
pandaDF.columns
Out[48]:
Run the following code in the next cell to make the timestamp as the index
In [49]:
# change index to time if its present
header_list = pandaDF.columns.values
valueHeaderName = 'value'
timeHeaderName = 'null'
timeHeaderName = header_list[0]
valueHeaderName = header_list[1]
# Drop the timestamp column as the index is replaced with timestamp now
pandaDF[timeHeaderName] = pd.to_datetime(pandaDF[timeHeaderName])
pandaDF.index = pandaDF[timeHeaderName]
pandaDF = pandaDF.drop([timeHeaderName], axis=1)
# Also, sort the index with the timestamp
pandaDF.sort_index(inplace=True)
pandaDF.head(n=5)
Out[49]:
Calculate z-score for each of the values and add it as a new column in the same DataFrame. Z-score is a standard score that indicates how many standard deviations an element is from the mean. A higher z-score value represents a larger deviation from the mean value which can be interpreted as abnormal.
In [50]:
# calculate z-score and populate a new column
pandaDF['zscore'] = (pandaDF[valueHeaderName] - pandaDF[valueHeaderName].mean())/pandaDF[valueHeaderName].std(ddof=0)
'''
This function detects the spike and dip by returning a non-zero value
when the z-score is above 3 (spike) and below -3(dip). Incase if you
want to capture the smaller spikes and dips, lower the zscore value from
3 to 2 in this function.
'''
def spike(row):
if(row["zscore"] >=3 or row["zscore"] <=-3):
return "TRUE"
else:
return "FALSE"
pandaDF['anomaly'] = pandaDF.apply(spike, axis=1)
pandaDF = pandaDF.drop("zscore", axis=1)
pandaDF.head(n=5)
Out[50]:
Create a credential object to store the resultant DataFrame into a CSV file
In [64]:
# @hidden_cell
credentials_2 = {
'auth_url':'https://identity.open.softlayer.com',
'project':'object_storage_97449055_a948_4667_bca9_5991990f2420',
'project_id':'557362dff79e4f92979dcee1dd7747ec',
'region':'dallas',
'user_id':'2a130fc399da4c46a9505509a3efaae0',
'domain_id':'3f9e1d4d81ef447295c928e0c5edc185',
'domain_name':'804347',
'username':'member_7f033e289d99e2e3466f398fb962bd82e6e4993a',
'password':"""vI^DY4=Hoy1gAlSY""",
'container':'DSXandWatsonIoTIntegration',
'tenantId':'undefined',
'filename':'temp-data.csv'
}
Store the dataframe into CSV file. It ll be stored in the Object store.
In [65]:
pandaDF.to_csv('processeddata.csv',index=False)
#cloudantdata.write.format('com.databricks.spark.csv').options(header='true').save(fileNameOut)
In [66]:
from io import BytesIO
import requests
import json
import pandas as pd
def put_file(credentials, local_file_name):
"""This functions returns a StringIO object containing
the file content from Bluemix Object Storage V3."""
f = open(local_file_name,'r')
my_data = f.read()
url1 = ''.join(['https://identity.open.softlayer.com', '/v3/auth/tokens'])
data = {'auth': {'identity': {'methods': ['password'],
'password': {'user': {'name': credentials['username'],'domain': {'id': credentials['domain_id']},
'password': credentials['password']}}}}}
headers1 = {'Content-Type': 'application/json'}
resp1 = requests.post(url=url1, data=json.dumps(data), headers=headers1)
resp1_body = resp1.json()
for e1 in resp1_body['token']['catalog']:
if(e1['type']=='object-store'):
for e2 in e1['endpoints']:
if(e2['interface']=='public'and e2['region']=='dallas'):
url2 = ''.join([e2['url'],'/', credentials['container'], '/', local_file_name])
s_subject_token = resp1.headers['x-subject-token']
headers2 = {'X-Auth-Token': s_subject_token, 'accept': 'application/json'}
resp2 = requests.put(url=url2, headers=headers2, data = my_data )
print resp2
In [67]:
put_file(credentials_2,'processeddata.csv')
In [ ]: