In [1]:
sqlContext=SQLContext(sc)

Place the curser in the next cell and add the credentials object. This will have the Cloudant NoSQL DB connectivity details


In [2]:
# @hidden_cell
credentials_1 = {
  'username':'e0436527-e028-46f3-9090-a4fce60bffc5-bluemix',
  'password':"""5a2715321a33c129ef05d63ef26099262a7e0d2d519bd00cee1db3994f0d531e""",
  'host':'e0436527-e028-46f3-9090-a4fce60bffc5-bluemix.cloudant.com',
  'port':'443',
  'url':'https://e0436527-e028-46f3-9090-a4fce60bffc5-bluemix:5a2715321a33c129ef05d63ef26099262a7e0d2d519bd00cee1db3994f0d531e@e0436527-e028-46f3-9090-a4fce60bffc5-bluemix.cloudant.com'
}

Load the data from Cloudant NoSQL DB into the Object Store. Replace the DB name to match your's


In [46]:
cloudantdata=sqlContext.read.format("com.cloudant.spark").\
option("cloudant.host",credentials_1['host']).\
option("cloudant.username", credentials_1['username']).\
option("cloudant.password", credentials_1['password']).\
load("iotp_9ts1nd_default_2016-12")

Select only the required columns. For example, select only the timestamp and temperature


In [47]:
df = cloudantdata.selectExpr("data.ts as timestamp", "data.d.temperature as temperature")

df.take(1)


Out[47]:
[Row(timestamp=u'2016-12-13T10:51:28.545+0530', temperature=17.357)]

Convert the Dataframe into Pandas dataframe. The Python Data Analysis Library (a.k.a. pandas) provides high-performance, easy-to-use data structures and data analysis tools that are designed to make working with relational or labeled data both easy and intuitive. It aims to be the fundamental high-level building block for doing practical, real world data analysis in Python.


In [48]:
import pprint
import pandas as pd
pandaDF = df.toPandas()
#Fill NA/NaN values to 0
pandaDF.fillna(0, inplace=True)
pandaDF.columns


Out[48]:
Index([u'timestamp', u'temperature'], dtype='object')

Run the following code in the next cell to make the timestamp as the index


In [49]:
# change index to time if its present
header_list = pandaDF.columns.values
valueHeaderName = 'value'
timeHeaderName = 'null'

timeHeaderName = header_list[0]
valueHeaderName = header_list[1]

# Drop the timestamp column as the index is replaced with timestamp now
pandaDF[timeHeaderName] = pd.to_datetime(pandaDF[timeHeaderName])
pandaDF.index = pandaDF[timeHeaderName] 
pandaDF = pandaDF.drop([timeHeaderName], axis=1)
# Also, sort the index with the timestamp
pandaDF.sort_index(inplace=True)
 
pandaDF.head(n=5)


Out[49]:
temperature
timestamp
2016-12-13 04:21:42.871 17.47
2016-12-13 04:21:44.610 17.47
2016-12-13 04:21:46.120 17.44
2016-12-13 04:21:47.808 17.44
2016-12-13 04:21:49.648 17.38

Calculate z-score for each of the values and add it as a new column in the same DataFrame. Z-score is a standard score that indicates how many standard deviations an element is from the mean. A higher z-score value represents a larger deviation from the mean value which can be interpreted as abnormal.


In [50]:
# calculate z-score and populate a new column
pandaDF['zscore'] = (pandaDF[valueHeaderName] - pandaDF[valueHeaderName].mean())/pandaDF[valueHeaderName].std(ddof=0)
'''
This function detects the spike and dip by returning a non-zero value 
when the z-score is above 3 (spike) and below -3(dip). Incase if you 
want to capture the smaller spikes and dips, lower the zscore value from 
3 to 2 in this function.
'''
def spike(row):
    if(row["zscore"] >=3 or row["zscore"] <=-3):
        return "TRUE" 
    else:
        return "FALSE"
 
pandaDF['anomaly'] = pandaDF.apply(spike, axis=1)
pandaDF = pandaDF.drop("zscore", axis=1)
pandaDF.head(n=5)


Out[50]:
temperature anomaly
timestamp
2016-12-13 04:21:42.871 17.47 FALSE
2016-12-13 04:21:44.610 17.47 FALSE
2016-12-13 04:21:46.120 17.44 FALSE
2016-12-13 04:21:47.808 17.44 FALSE
2016-12-13 04:21:49.648 17.38 FALSE

Create a credential object to store the resultant DataFrame into a CSV file


In [64]:
# @hidden_cell
credentials_2 = {
  'auth_url':'https://identity.open.softlayer.com',
  'project':'object_storage_97449055_a948_4667_bca9_5991990f2420',
  'project_id':'557362dff79e4f92979dcee1dd7747ec',
  'region':'dallas',
  'user_id':'2a130fc399da4c46a9505509a3efaae0',
  'domain_id':'3f9e1d4d81ef447295c928e0c5edc185',
  'domain_name':'804347',
  'username':'member_7f033e289d99e2e3466f398fb962bd82e6e4993a',
  'password':"""vI^DY4=Hoy1gAlSY""",
  'container':'DSXandWatsonIoTIntegration',
  'tenantId':'undefined',
  'filename':'temp-data.csv'
}

Store the dataframe into CSV file. It ll be stored in the Object store.


In [65]:
pandaDF.to_csv('processeddata.csv',index=False)
#cloudantdata.write.format('com.databricks.spark.csv').options(header='true').save(fileNameOut)

In [66]:
from io import BytesIO  
import requests  
import json  
import pandas as pd

def put_file(credentials, local_file_name):  
    """This functions returns a StringIO object containing
    the file content from Bluemix Object Storage V3."""
    f = open(local_file_name,'r')
    my_data = f.read()
    url1 = ''.join(['https://identity.open.softlayer.com', '/v3/auth/tokens'])
    data = {'auth': {'identity': {'methods': ['password'],
            'password': {'user': {'name': credentials['username'],'domain': {'id': credentials['domain_id']},
            'password': credentials['password']}}}}}
    headers1 = {'Content-Type': 'application/json'}
    resp1 = requests.post(url=url1, data=json.dumps(data), headers=headers1)
    resp1_body = resp1.json()
    for e1 in resp1_body['token']['catalog']:
        if(e1['type']=='object-store'):
            for e2 in e1['endpoints']:
                        if(e2['interface']=='public'and e2['region']=='dallas'):
                            url2 = ''.join([e2['url'],'/', credentials['container'], '/', local_file_name])
    s_subject_token = resp1.headers['x-subject-token']
    headers2 = {'X-Auth-Token': s_subject_token, 'accept': 'application/json'}
    resp2 = requests.put(url=url2, headers=headers2, data = my_data )
    print resp2

In [67]:
put_file(credentials_2,'processeddata.csv')


<Response [201]>

In [ ]: