In [1]:
cd /root/git/SolarDataRESTfulAPI/
In [2]:
#!/usr/bin/python
# -*- coding: utf-8 -*-
# <nbformat>3.0</nbformat>
# <codecell>
#cd git/SolarDataRESTfulAPI/
# <codecell>
import json
import pandas as pd
import InfluxDBInterface
import time
reload(InfluxDBInterface)
from ElasticsearchInterface import ESinterface
import sys
import mosquitto
import os
import argparse
def EpocToDate(timestamp):
return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(timestamp))
def SecToHMS(sec):
sec = int(sec)
hour = sec / 3600
minutes = (sec - (hour * 3600))/60
secs = sec % 60
return "%i h %i min %i s" %(hour,minutes,secs)
def RemoveResets(series):
FirstValue = series.iloc[0]
change = series.diff().clip(0)
change.iloc[0] = FirstValue
return change.cumsum()
def CalculateProduction(Site,LogDB,ProductionDB,Recalculate=False):
#Create property lists
EnergyProp = LogDB.GetPropertiesPartiallyMatchingAbutNotB(Site,"POWc","Tot")
PowerProp = LogDB.GetPropertiesPartiallyMatchingAbutNotB(Site,"Pac","Tot")
PreviousLastValidValue = 0
PreviousLastValidValueTime = 0
#Determine where to resume.
if Recalculate == False:
(PreviousLastValidValueTime,PreviousLastValidValue) = ProductionDB.GetLastValue(Site,"Energy")
TimestampP = ProductionDB.GetLastTimestamp(Site,"Power")
if (PreviousLastValidValueTime != None and TimestampP != None):
#The start from where we have both power and energy values.
if TimestampP < PreviousLastValidValueTime:
PreviousLastValidValueTime = TimestampP
PreviousLastValidValueTime = PreviousLastValidValueTime / 1000
print "\tResuming calculation from: %s" % EpocToDate(PreviousLastValidValueTime)
#Get last data.
dfLog = LogDB.GetDataAfterTime(Site,EnergyProp + PowerProp,PreviousLastValidValueTime,1000)
else:
dfLog = LogDB.GetDataAfterTime(Site,EnergyProp + PowerProp,None,1000)
print "No previous data starting from first log data."
else:
#Get a log data chunck
dfLog = LogDB.GetDataAfterTime(Site,EnergyProp + PowerProp,None,1000)
while (dfLog.shape[0] > 1):
#Create a production frame.
dfProduction = pd.DataFrame(columns = ["Power","Energy"])
#Calculate power
dfProduction["Power"] = dfLog[PowerProp].sum(axis=1)
#Calculate energy
dfPOWc = dfLog[EnergyProp]
dfProduction["Energy"] = dfPOWc.apply(RemoveResets).sum(axis=1)
#Add offset from previus iteration.
#Check if we have overlap. Is the last time the same as the smallest countervalue in the current array.
FirstValidValueTime = dfProduction["Energy"].idxmin()
#First time ever... or just NaN values in data.
if PreviousLastValidValueTime == None or pd.isnull(FirstValidValueTime):
offset = 0
#Normal overlap
else:
offset = PreviousLastValidValue - dfProduction["Energy"][FirstValidValueTime]
dfProduction["Energy"] += offset
#Update database
ProductionDB.Replace(Site,dfProduction)
#Keep track of counter max.
MaxEnergyTime = dfProduction["Energy"].idxmax()
if not pd.isnull(MaxEnergyTime):
PreviousLastValidValue = dfProduction["Energy"][MaxEnergyTime]
PreviousLastValidValueTime = MaxEnergyTime
dfLog = LogDB.GetNextNRows(dfLog,1000)
return dfLog.index[-1]
# <codecell>
In [3]:
reload(influxdb)
In [7]:
if True:
#Parse arguments
#parser = argparse.ArgumentParser(add_help=False)
#parser.add_argument('-h', dest='host', default="localhost", help='MQTT host send results to')
#parser.add_argument('-t', dest='topic', default="", help='MQTT topic to process')
#parser.add_argument('-m', dest='message', default="", help='MQTT message to process')
#args = parser.parse_args()
#Get location of script
path = os.path.abspath(os.path.dirname(sys.argv[0]))
#Set up MQTT
ip = "localhost"
port = 1883
user = "driver"
password = "1234"
prefix = "SolarProductionProducer"
mqtt=mosquitto.Mosquitto("ProductionProducer")
mqtt.prefix = prefix
mqtt.ip = ip
mqtt.port = port
#mqtt.clientId = clientId
mqtt.user = user
mqtt.password = password
if mqtt != None:
mqtt.username_pw_set(user,password)
#mqtt.will_set( topic = "system/" + prefix, payload="Idle", qos=1, retain=True)
mqtt.connect(ip,keepalive=10)
mqtt.publish(topic = "system/"+ prefix, payload="Updating", qos=1, retain=True)
#Init resources
DataLink = InfluxDBInterface.InfluxDBInterface("/root/MQTT-Stage/topics/solardata/Operator/SLB/lastupdate" + "/" + "influxInterfaceCredentials2.json")
LogDB = DataLink.databases[u'SolarLogdata']
ProductionDB = DataLink.databases[u'SolarProductionSites']
#es = ESinterface()
#Init vars
Sites = LogDB.ListSeries()
now = time.time()
#Loop throug all sites.
for Site in Sites:
break
print "Processing %s " % Site
sys.stdout.flush()
until = CalculateProduction(Site,LogDB,ProductionDB,False)
until = int(now - until)
hour = until / 3600
minutes = (until - (hour * 3600))/60
secs = until % 60
print "\tFinnished processing up to %i hours %i minutes and %i seconds from script start time" % (hour,minutes,secs)
sys.stdout.flush()
print "Done"
sys.stdout.flush()
mqtt.connect(ip,keepalive=10)
#mqtt.publish(topic = "solardata/production/at", payload=str((TrailTime,LeadTime)), qos=1, retain=True)
mqtt.publish(topic = "solardata/production/lastupdate", payload=now, qos=1, retain=True)
mqtt.publish(topic = "system/"+ prefix, payload="Idle", qos=1, retain=True)
time.sleep(0.5)
del mqtt
In [24]:
ls ~/MQTT-Stage/topics/solardata/Operator/SLB/lastupdate/influxInterfaceCredentials2.json
In [26]:
Recalculate=False
Site=Sites[0]
#Create property lists
EnergyProp = LogDB.GetPropertiesPartiallyMatchingAbutNotB(Site,"POWc","Tot")
PowerProp = LogDB.GetPropertiesPartiallyMatchingAbutNotB(Site,"Pac","Tot")
PreviousLastValidValue = 0
PreviousLastValidValueTime = 0
#Determine where to resume.
if Recalculate == False:
(PreviousLastValidValueTime,PreviousLastValidValue) = ProductionDB.GetLastValue(Site,"Energy")
TimestampP = ProductionDB.GetLastTimestamp(Site,"Power")
if (PreviousLastValidValueTime != None and TimestampP != None):
#The start from where we have both power and energy values.
if TimestampP < PreviousLastValidValueTime:
PreviousLastValidValueTime = TimestampP
PreviousLastValidValueTime = PreviousLastValidValueTime / 1000
print "\tResuming calculation from: %s" % EpocToDate(PreviousLastValidValueTime)
#Get last data.
dfLog = LogDB.GetDataAfterTime(Site,EnergyProp + PowerProp,PreviousLastValidValueTime,1000)
else:
dfLog = LogDB.GetDataAfterTime(Site,EnergyProp + PowerProp,None,1000)
print "No previous data starting from first log data."
else:
#Get a log data chunck
dfLog = LogDB.GetDataAfterTime(Site,EnergyProp + PowerProp,None,1000)
In [27]:
#Create a production frame.
dfProduction = pd.DataFrame(columns = ["Power","Energy"])
#Calculate power
dfProduction["Power"] = dfLog[PowerProp].sum(axis=1)
#Calculate energy
dfPOWc = dfLog[EnergyProp]
dfProduction["Energy"] = dfPOWc.apply(RemoveResets).sum(axis=1)
#Add offset from previus iteration.
#Check if we have overlap. Is the last time the same as the smallest countervalue in the current array.
FirstValidValueTime = dfProduction["Energy"].idxmin()
#First time ever... or just NaN values in data.
if PreviousLastValidValueTime == None or pd.isnull(FirstValidValueTime):
offset = 0
#Normal overlap
else:
offset = PreviousLastValidValue - dfProduction["Energy"][FirstValidValueTime]
dfProduction["Energy"] += offset
#Update database
ProductionDB.Replace(Site,dfProduction)
#Keep track of counter max.
MaxEnergyTime = dfProduction["Energy"].idxmax()
if not pd.isnull(MaxEnergyTime):
PreviousLastValidValue = dfProduction["Energy"][MaxEnergyTime]
PreviousLastValidValueTime = MaxEnergyTime
dfLog = LogDB.GetNextNRows(dfLog,1000)
#return dfLog.index[-1]
if (dfLog.shape[0] > 1):
print "Ended"
In [10]:
dfLog
Out[10]:
In [12]:
dfProduction
Out[12]:
In [40]:
ProductionDB.Replace(Site,dfProduction)
In [32]:
ProductionDB.get_database_list()
In [34]:
ProductionDB._password
Out[34]:
In [ ]:
ProductionDB.query("delete from %s where time > %s and time < %s" % (Sites[0],"'2014-11-01'","now()"))
In [ ]:
Sites[0]
In [50]:
ProductionDB.ListSeries()
Out[50]:
In [70]:
try:
ProductionDB.GetFirstValue(Sites[0])
except Exception, err:
if err.message.find("400: Couldn't find series:") != -1:
print 0.0
else:
raise err
In [59]:
err
Out[59]:
In [61]:
err.args
Out[61]:
In [62]:
err.message
Out[62]:
In [75]:
LogDB.ListSeries()
Out[75]:
In [76]:
ret = LogDB.query("list series")
In [82]:
ret[0]["points"]
Out[82]:
In [84]:
for series in ret[0]["points"]:
print series[1]
In [90]:
LogDB.query("select * from \"%s\" limit 1" % '46d55815-f927-459f-a8e2-8bbcd88008ee' )
Out[90]:
In [88]:
"select * from '%s' limit 1" % '46d55815-f927-459f-a8e2-8bbcd88008ee'
Out[88]:
In [10]:
LogDB.GetLastTimeStamp("46d55815-f927-459f-a8e2-8bbcd88008ee")
In [9]:
LogDB.GetLastTimestamp("46d55815-f927-459f-a8e2-8bbcd88008ee")
Out[9]:
In [11]:
from SLB_DataImporter import InfluxFeedLTSInterface
In [12]:
InfluxFeedLTSInterface.GetLastTimeStamp(LogDB,"46d55815-f927-459f-a8e2-8bbcd88008ee")
In [13]:
from SLB_DataImporter import ParseSLBData
In [30]:
import time
ParseSLBData("f07t",time.time()-3600*24*11.5,time.time()-3600*24*10.5)
Out[30]:
In [ ]: