In [1]:
    
sqlContext=SQLContext(sc)
    
Place the curser in the next cell and add the credentials object. This will have the Cloudant NoSQL DB connectivity details
In [2]:
    
# @hidden_cell
credentials_1 = {
  'username':'e0436527-e028-46f3-9090-a4fce60bffc5-bluemix',
  'password':"""5a2715321a33c129ef05d63ef26099262a7e0d2d519bd00cee1db3994f0d531e""",
  'host':'e0436527-e028-46f3-9090-a4fce60bffc5-bluemix.cloudant.com',
  'port':'443',
  'url':'https://e0436527-e028-46f3-9090-a4fce60bffc5-bluemix:5a2715321a33c129ef05d63ef26099262a7e0d2d519bd00cee1db3994f0d531e@e0436527-e028-46f3-9090-a4fce60bffc5-bluemix.cloudant.com'
}
    
Load the data from Cloudant NoSQL DB into the Object Store. Replace the DB name to match your's
In [46]:
    
cloudantdata=sqlContext.read.format("com.cloudant.spark").\
option("cloudant.host",credentials_1['host']).\
option("cloudant.username", credentials_1['username']).\
option("cloudant.password", credentials_1['password']).\
load("iotp_9ts1nd_default_2016-12")
    
Select only the required columns. For example, select only the timestamp and temperature
In [47]:
    
df = cloudantdata.selectExpr("data.ts as timestamp", "data.d.temperature as temperature")
df.take(1)
    
    Out[47]:
Convert the Dataframe into Pandas dataframe. The Python Data Analysis Library (a.k.a. pandas) provides high-performance, easy-to-use data structures and data analysis tools that are designed to make working with relational or labeled data both easy and intuitive. It aims to be the fundamental high-level building block for doing practical, real world data analysis in Python.
In [48]:
    
import pprint
import pandas as pd
pandaDF = df.toPandas()
#Fill NA/NaN values to 0
pandaDF.fillna(0, inplace=True)
pandaDF.columns
    
    Out[48]:
Run the following code in the next cell to make the timestamp as the index
In [49]:
    
# change index to time if its present
header_list = pandaDF.columns.values
valueHeaderName = 'value'
timeHeaderName = 'null'
timeHeaderName = header_list[0]
valueHeaderName = header_list[1]
# Drop the timestamp column as the index is replaced with timestamp now
pandaDF[timeHeaderName] = pd.to_datetime(pandaDF[timeHeaderName])
pandaDF.index = pandaDF[timeHeaderName] 
pandaDF = pandaDF.drop([timeHeaderName], axis=1)
# Also, sort the index with the timestamp
pandaDF.sort_index(inplace=True)
 
pandaDF.head(n=5)
    
    Out[49]:
Calculate z-score for each of the values and add it as a new column in the same DataFrame. Z-score is a standard score that indicates how many standard deviations an element is from the mean. A higher z-score value represents a larger deviation from the mean value which can be interpreted as abnormal.
In [50]:
    
# calculate z-score and populate a new column
pandaDF['zscore'] = (pandaDF[valueHeaderName] - pandaDF[valueHeaderName].mean())/pandaDF[valueHeaderName].std(ddof=0)
'''
This function detects the spike and dip by returning a non-zero value 
when the z-score is above 3 (spike) and below -3(dip). Incase if you 
want to capture the smaller spikes and dips, lower the zscore value from 
3 to 2 in this function.
'''
def spike(row):
    if(row["zscore"] >=3 or row["zscore"] <=-3):
        return "TRUE" 
    else:
        return "FALSE"
 
pandaDF['anomaly'] = pandaDF.apply(spike, axis=1)
pandaDF = pandaDF.drop("zscore", axis=1)
pandaDF.head(n=5)
    
    Out[50]:
Create a credential object to store the resultant DataFrame into a CSV file
In [64]:
    
# @hidden_cell
credentials_2 = {
  'auth_url':'https://identity.open.softlayer.com',
  'project':'object_storage_97449055_a948_4667_bca9_5991990f2420',
  'project_id':'557362dff79e4f92979dcee1dd7747ec',
  'region':'dallas',
  'user_id':'2a130fc399da4c46a9505509a3efaae0',
  'domain_id':'3f9e1d4d81ef447295c928e0c5edc185',
  'domain_name':'804347',
  'username':'member_7f033e289d99e2e3466f398fb962bd82e6e4993a',
  'password':"""vI^DY4=Hoy1gAlSY""",
  'container':'DSXandWatsonIoTIntegration',
  'tenantId':'undefined',
  'filename':'temp-data.csv'
}
    
Store the dataframe into CSV file. It ll be stored in the Object store.
In [65]:
    
pandaDF.to_csv('processeddata.csv',index=False)
#cloudantdata.write.format('com.databricks.spark.csv').options(header='true').save(fileNameOut)
    
In [66]:
    
from io import BytesIO  
import requests  
import json  
import pandas as pd
def put_file(credentials, local_file_name):  
    """This functions returns a StringIO object containing
    the file content from Bluemix Object Storage V3."""
    f = open(local_file_name,'r')
    my_data = f.read()
    url1 = ''.join(['https://identity.open.softlayer.com', '/v3/auth/tokens'])
    data = {'auth': {'identity': {'methods': ['password'],
            'password': {'user': {'name': credentials['username'],'domain': {'id': credentials['domain_id']},
            'password': credentials['password']}}}}}
    headers1 = {'Content-Type': 'application/json'}
    resp1 = requests.post(url=url1, data=json.dumps(data), headers=headers1)
    resp1_body = resp1.json()
    for e1 in resp1_body['token']['catalog']:
        if(e1['type']=='object-store'):
            for e2 in e1['endpoints']:
                        if(e2['interface']=='public'and e2['region']=='dallas'):
                            url2 = ''.join([e2['url'],'/', credentials['container'], '/', local_file_name])
    s_subject_token = resp1.headers['x-subject-token']
    headers2 = {'X-Auth-Token': s_subject_token, 'accept': 'application/json'}
    resp2 = requests.put(url=url2, headers=headers2, data = my_data )
    print resp2
    
In [67]:
    
put_file(credentials_2,'processeddata.csv')
    
    
In [ ]: