In [202]:
import requests
import pandas
import time
import json
from influxdb import InfluxDBClient
import numpy
def download_file(url):
local_filename = url.split('/')[-1]
# NOTE the stream=True parameter
r = requests.get(url, stream=True)
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
f.flush()
return local_filename
def LoadSiteIds(file="SiteIDs.json"):
fp = open(file,"r")
dic = json.load(fp)
fp.close()
return dic
def ParseSLBData(slb_id="h00t",start=time.time()-(24*60*60),stop=time.time()):
starttime = time.strftime("%y%m%d%H%M",time.localtime(start))
stoptime = time.strftime("%y%m%d%H%M",time.localtime(stop))
url = "http://slb.nu/soldata/index.php?KEY=%s&start=%s&stop=%s" %(slb_id,starttime,stoptime)
df = pandas.read_csv(url,sep = ";",parse_dates=[[0, 1]],skiprows=8, header = None ,infer_datetime_format = True,na_values = [" "," "," "," ",""])
cl = pandas.read_csv(url,sep = ";", header = 6,error_bad_lines= False,na_values = [""],nrows=1)
#Align keys to data and rename time col.
cols = cl.keys()
cols = cols[2:]
col2 = cols.insert(0,"Time")
col2 = col2.insert(-1,"NAN")
#Set data keys as column descriptors
df.columns = col2
#Delete trailing columns with junk.
for key in df.keys()[-5:df.shape[1]-1]:
if key.find(slb_id) == -1:
del df[key]
#Reformat timestamps
for i in range(0,df.shape[0]):
try:
timestamp = time.mktime(time.strptime(df["Time"][i],"%y-%m-%d %H:%M"))
df["Time"][i] = timestamp
except:
df = df.drop(df.index[i])
return df
def ParseCVS(file):
file = open(file,"r")
df = pandas.read_csv(file)
print df
print df['Line']
def SendToInfluxDB(df,FeedId,config_file="influx.json"):
#Series name
series = FeedId + "/raw_data"
#Load database credentials
fp = open(config_file,"r")
config = json.load(fp)
fp.close()
#Connect
client = InfluxDBClient(config["host"], config["port"], config["user"], config["password"], config["database"])
#Save each row
for i in range(0,Data.shape[0]):
timestamp = Data.irow(i)[0]
column = ["time"]
data = [int(timestamp*1000)]
#Remove NANs
for j in range(1,Data.shape[1]):
if Data.iloc[i,j] == "NaN" or Data.iloc[i,j] == "nan":
continue
#Add key
column.append(Data.keys()[j])
data.append(Data.iloc[i,j])
fdata = [{
"points": [data],
"name": series,
"columns": column
}]
client.write_points_with_precision(fdata,"m")
return
In [203]:
#ParseCVS("testdata/h00t_1310160720_1406030410.csv")
SiteIDs = LoadSiteIds()
print SiteIDs.keys()
In [228]:
x = 5
timestamp = 1402015728.10898
Data = ParseSLBData(SiteIDs.keys()[0],timestamp-(24*60*60*7*(x+30)),timestamp -(24*60*60*7*x))
In [234]:
Data.shape
Out[234]:
In [235]:
FeedId = SiteIDs["h00t"]
config_file="influx.json"
In [236]:
#Series name
series = FeedId + "/raw_data"
#Load database credentials
fp = open(config_file,"r")
config = json.load(fp)
fp.close()
#Connect
client = InfluxDBClient(config["host"], config["port"], config["user"], config["password"], config["database"])
In [237]:
#Save each row
for i in range(0,Data.shape[0]):
timestamp = Data.irow(i)[0]
column = ["time"]
data = [int(timestamp*1000)]
#Remove NANs
for j in range(1,Data.shape[1]):
if numpy.isnan(Data.iloc[i,j]):
continue
#Add key
column.append(Data.keys()[j])
data.append(Data.iloc[i,j])
fdata = [{
"points": [data],
"name": series,
"columns": column
}]
client.write_points_with_precision(fdata,"m")
In [222]:
time.time()
Out[222]:
In [260]:
client
Out[260]:
In [293]:
class InfluxFeedLTSInterface(InfluxDBClient):
def __init__(self,config_file="influx.json"):
#Load database credentials
fp = open(config_file,"r")
self.config = json.load(fp)
fp.close()
#Connect
InfluxDBClient.__init__(self,self.config["host"], self.config["port"], self.config["user"], self.config["password"], self.config["database"])
def GetLastTimeStamp(self,FluxId):
result = self.query('select time from %s order desc limit 1;' % FluxId, time_precision='m')
try:
return float(result[0]["points"][0][0])/1000.0
except:
return 0.0
def SendToInfluxDB(self,df,FeedId):
#Series name
series = FeedId + "/raw_data"
#Save each row
for i in range(0,Data.shape[0]):
timestamp = Data.irow(i)[0]
column = ["time"]
data = [int(timestamp*1000)]
#Remove NANs
for j in range(1,Data.shape[1]):
if numpy.isnan(Data.iloc[i,j]):
continue
#Add key
column.append(Data.keys()[j])
data.append(Data.iloc[i,j])
#If there where only nan on this row continue to next row.
if len(column) == 0:
continue
fdata = [{
"points": [data],
"name": series,
"columns": column
}]
self.write_points_with_precision(fdata,"m")
return
In [271]:
#ParseCVS("testdata/h00t_1310160720_1406030410.csv")
SiteIDs = LoadSiteIds()
Feeds = InfluxFeedLTSInterface()
#Get all data until now + 1h
StopTime = time.time() + 3600
PeriodLen = 60*60*24*7
In [267]:
for Site in SiteIDs:
FeedId = "\"%s/raw_data\"" % SiteIDs[Site]
StartTime = Feeds.GetLastTimeStamp(FeedId)
Current = StartTime
while Current < StopTime:
print "Reading SLB data from: " + Site
Data = ParseSLBData(Site,Current,Current + PeriodLen)
Current += PeriodLen
print "Sending data to influx as: " + FeedId
#Feeds.SendToInfluxDB(Data,FeedId)
In [308]:
#ParseCVS("testdata/h00t_1310160720_1406030410.csv")
SiteIDs = LoadSiteIds()
Feeds = InfluxFeedLTSInterface()
#Get all data until now + 1h
StopTime = time.time() + 3600
for Site in SiteIDs:
FeedId = "\"%s/raw_data\"" % SiteIDs[Site]
StartTime = Feeds.GetLastTimeStamp(FeedId)
if StartTime == 0:
print "No previous records in:" + FeedId
print "Starting from Okt 2013"
StartTime = time.mktime(time.strptime("2013-10-01","%Y-%m-%d"))
else:
print "Last record in stream" + FeedId
print "at: " + time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(StartTime))
#Start a tiny bit after the last value.
Current = StartTime + 0.5
PeriodLen = 60*60*24*7
while Current < StopTime:
print "Reading SLB data from: " + Site
print "From: " + time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(Current))
print "To: " + time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(Current + PeriodLen))
Data = ParseSLBData(Site,Current,Current + PeriodLen)
#Remove duplicate
if Data["Time"][0] == StartTime:
Data = Data.drop(Data.index[0])
Current += PeriodLen
print "Sending data to influx as: " + FeedId
#Feeds.SendToInfluxDB(Data,FeedId)
In [306]:
Out[306]:
In [298]:
StartTime
Out[298]:
In [302]:
Data = Data.drop(Data.index[0])
In [303]:
Data
Out[303]:
In [313]:
url = "http://www.slb.nu/soldata/index.php?KEY=b01t&start=1305310000&stop=1306080360"
In [314]:
df = pandas.read_csv(url,sep = ";",parse_dates=[[0, 1]],skiprows=8, header = None ,infer_datetime_format = True,na_values = [" "," "," "," ",""])
cl = pandas.read_csv(url,sep = ";", header = 6,error_bad_lines= False,na_values = [""],nrows=1)
In [316]:
#Align keys to data and rename time col.
cols = cl.keys()
cols = cols[2:]
col2 = cols.insert(0,"Time")
col2 = col2.insert(-1,"NAN")
#Set data keys as column descriptors
df.columns = col2
#Delete trailing columns with junk.
for key in df.keys()[-5:df.shape[1]-1]:
if key.find(slb_id) == -1:
del df[key]
#Reformat timestamps
for i in range(0,df.shape[0]):
try:
timestamp = time.mktime(time.strptime(df["Time"][i],"%y-%m-%d %H:%M"))
df["Time"][i] = timestamp
except:
df = df.drop(df.index[i])
In [317]:
df
Out[317]:
In [318]:
url
Out[318]:
In [319]:
cl
Out[319]:
In [320]:
cl.keys()
Out[320]:
In [321]:
cols = cl.keys()
In [322]:
type(col)
Out[322]:
In [324]:
cols[2] = cols[2].strip("b01t")
In [326]:
t = cols[2].strip("b01t")
In [328]:
type(cols)
Out[328]:
In [330]:
df.columns = [1,2,3]
In [334]:
#Remove SLB station id from key.
slb_id = "b01t"
NewCols = []
for datakey in cols:
NewCols.append(datakey.strip(slb_id))
In [335]:
NewCols
Out[335]:
In [336]:
cl
Out[336]:
In [337]:
cl.columns = NewCols
In [338]:
cl
Out[338]:
In [341]:
SiteIDs = LoadSiteIds()
In [342]:
SiteIDs
Out[342]:
In [ ]: