This page implements a workflow that analyses the correlation between a configurable number of variables. It produces a correlation matrix and a graph given a minimum correlation threshold. Eventually, it finds the graph max cliques, which show all the variables that are together correlated above the threshold. The workflow can run over multipe iterations, with parametrisable sampling-rate and length of the variable's batches.
The workflow specification and its parametrisation are inspired by the following research paper: https://www.cs.ubc.ca/~hoos/Publ/RosEtAl07.pdf
In [1]:
%matplotlib inline
from dispel4py.workflow_graph import WorkflowGraph
from dispel4py.provenance import *
from dispel4py.new.processor import *
import time
import random
import numpy
import traceback
from dispel4py.base import create_iterative_chain, GenericPE, ConsumerPE, IterativePE, SimpleFunctionPE
from dispel4py.new.simple_process import process_and_return
from dispel4py.visualisation import display
import IPython
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats.stats import pearsonr
import networkx as nx
from itertools import combinations
sns.set(style="white")
class Start(GenericPE):
def __init__(self):
GenericPE.__init__(self)
self._add_input('iterations')
self._add_output('output')
#self.prov_cluster="myne"
def _process(self,inputs):
if 'iterations' in inputs:
inp=inputs['iterations']
self.write('output',inp,metadata={'iterations':inp})
#Uncomment this line to associate this PE to the mycluster provenance-cluster
#self.prov_cluster ='mycluster'
class Source(GenericPE):
def __init__(self,sr,index,batchsize):
GenericPE.__init__(self)
self._add_input('iterations')
self._add_output('output')
self.sr=sr
self.var_index=index
self.batchsize=batchsize
#self.prov_cluster="myne"
self.parameters={'sampling_rate':sr,'batchsize':batchsize}
#Uncomment this line to associate this PE to the mycluster provenance-cluster
#self.prov_cluster ='mycluster'
def _process(self,inputs):
if 'iterations' in inputs:
iteration=inputs['iterations']
batch=[]
it=1
#Streams out values at 1/self.sr sampling rate, until iteration>0
while (it<=iteration):
while (len(batch)<self.batchsize):
val=random.uniform(0,100)
time.sleep(1/self.sr)
batch.append(val)
self.write('output',(it,self.var_index,batch),metadata={'var':self.var_index,'iteration':it,'batch':batch})
batch=[]
it+=1
class MaxClique(GenericPE):
def __init__(self,threshold):
GenericPE.__init__(self)
self._add_input('input')
self._add_output('graph')
self._add_output('clique')
self.threshold=threshold
#self.prov_cluster="myne"
self.parameters={'threshold':threshold}
#Uncomment this line to associate this PE to the mycluster provenance-cluster
#self.prov_cluster ='mycluster'
def _process(self,inputs):
if 'input' in inputs:
matrix=inputs['input'][0]
iteration=inputs['input'][1]
low_values_indices = matrix < self.threshold # Where values are low
matrix[low_values_indices] = 0
#plt.figure('graph_'+str(iteration))
H = nx.from_numpy_matrix(matrix)
fig = plt.figure('graph_'+str(iteration))
text = "Iteration "+str(iteration)+" "+"graph"
labels = {i : i for i in H.nodes()}
pos = nx.circular_layout(H)
nx.draw_circular(H)
nx.draw_networkx_labels(H, pos, labels, font_size=15)
fig.text(.1,.1,text)
self.write('graph',matrix,metadata={'graph':str(matrix),'batch':iteration})
# labels = {i : i for i in H.nodes()}
# pos = nx.circular_layout(H)
# nx.draw_circular(H)
# nx.draw_networkx_labels(H, pos, labels, font_size=15)
cliques = list(nx.find_cliques(H))
fign=0
maxcnumber=0
maxclist=[]
for nodes in cliques:
if (len(nodes)>maxcnumber):
maxcnumber=len(nodes)
for nodes in cliques:
if (len(nodes)==maxcnumber):
maxclist.append(nodes)
for nodes in maxclist:
edges = combinations(nodes, 2)
C = nx.Graph()
C.add_nodes_from(nodes)
C.add_edges_from(edges)
fig = plt.figure('clique_'+str(iteration)+str(fign))
text = "Iteration "+str(iteration)+" "+"clique "+str(fign)
fign+=1
labels = {i : i for i in C.nodes()}
pos = nx.circular_layout(C)
nx.draw_circular(C)
nx.draw_networkx_labels(C, pos, labels, font_size=15)
fig.text(.1,.1,text)
self.write('clique',cliques,metadata={'clique':str(nodes),'iteration':iteration, 'order':maxcnumber,'prov:type':"hft:products"}, location="file://cliques/")
class CorrMatrix(GenericPE):
def __init__(self,variables_number):
GenericPE.__init__(self)
self._add_input('input',grouping=[0])
self._add_output('output')
self.size=variables_number
self.parameters={'variables_number':variables_number}
self.data={}
#Uncomment this line to associate this PE to the mycluster provenance-cluster
#self.prov_cluster ='mycluster'self.prov_cluster='mycluster'
def _process(self,inputs):
for x in inputs:
if inputs[x][0] not in self.data:
#prepares the data to visualise the xcor matrix of a specific batch number.
self.data[inputs[x][0]]={}
self.data[inputs[x][0]]['matrix']=numpy.identity(self.size)
self.data[inputs[x][0]]['ro_count']=0
if (inputs[x][1][0]>inputs[x][1][1]):
self.data[inputs[x][0]]['matrix'][inputs[x][1][0]-1,inputs[x][1][1]-1]=inputs[x][2]
if (inputs[x][1][0]<inputs[x][1][1]):
self.data[inputs[x][0]]['matrix'][inputs[x][1][1]-1,inputs[x][1][0]-1]=inputs[x][2]
self.data[inputs[x][0]]['matrix'][inputs[x][1][0]-1,inputs[x][1][1]-1]=inputs[x][2]
self.data[inputs[x][0]]['ro_count']+=1
if self.data[inputs[x][0]]['ro_count']==(self.size*(self.size-1))/2:
matrix=self.data[inputs[x][0]]['matrix']
d = pd.DataFrame(data=matrix,
columns=range(0,self.size),index=range(0,self.size))
mask = numpy.zeros_like(d, dtype=numpy.bool)
mask[numpy.triu_indices_from(mask)] = True
# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(11, 9))
# Generate a custom diverging colormap
cmap = sns.diverging_palette(220, 10, as_cmap=True)
# Draw the heatmap with the mask and correct aspect ratio
sns.heatmap(d, mask=mask, cmap=cmap, vmax=1,
square=True,
linewidths=.5, cbar_kws={"shrink": .5}, ax=ax)
plt.show()
self.log('\r\n'+str(matrix))
self.write('output',(matrix,inputs[x][0]),metadata={'matrix':str(d),'iteration':str(inputs[x][0])})
##dep=['iter_'+str(inputs[x][0])])
class CorrCoef(GenericPE):
def __init__(self):
GenericPE.__init__(self)
#self._add_input('input1',grouping=[0])
#self._add_input('input2',grouping=[0])
self._add_output('output')
self.data={}
def _process(self, inputs):
index=None
val=None
for x in inputs:
if inputs[x][0] not in self.data:
self.data[inputs[x][0]]=[]
for y in self.data[inputs[x][0]]:
ro=numpy.corrcoef(y[2],inputs[x][2])
self.write('output',(inputs[x][0],(y[1],inputs[x][1]),ro[0][1]),metadata={'iteration':inputs[x][0],'vars':str(y[1])+"_"+str(inputs[x][1]),'rho':ro[0][1]},dep=['var_'+str(y[1])+"_"+str(y[0])])
#appends var_index and value
self.data[inputs[x][0]].append(inputs[x])
#self.log(self.data[inputs[x][0]])
self.update_prov_state('var_'+str(inputs[x][1])+"_"+str(inputs[x][0]),None,metadata={'var_'+str(inputs[x][1]):inputs[x][2]}, ignore_inputs=True)
In [2]:
variables_number=3
sampling_rate=100
batch_size=3
iterations=2
input_data = {"Start": [{"iterations": iterations}]}
# Instantiates the Workflow Components
# and generates the graph based on parameters
def createWf():
graph = WorkflowGraph()
mat=CorrMatrix(variables_number)
mat.prov_cluster='record2'
mc = MaxClique(-0.5)
mc.prov_cluster='record0'
start=Start()
start.prov_cluster='record0'
sources={}
cc=CorrCoef()
cc.prov_cluster='record1'
stock=['hft:NASDAQ','hft:MIBTEL','hft:DOWJ']
for i in range(1,variables_number+1):
sources[i] = Source(sampling_rate,i,batch_size)
sources[i].prov_cluster=stock[i%len(stock)-1]
sources[i].numprocesses=1
for h in range(1,variables_number+1):
cc._add_input('input'+'_'+str(h+1),grouping=[0])
graph.connect(start,'output',sources[h],'iterations')
graph.connect(sources[h],'output',cc,'input'+'_'+str(h+1))
graph.connect(cc,'output',mat,'input')
graph.connect(mat,'output',mc,'input')
return graph
In [3]:
#Launch in simple process
print ("Preparing for: "+str(iterations)+" projections" )
graph=createWf()
start_time = time.time()
#Uncommend to run the workflow without provenance
#process_and_return(graph, input_data)
elapsed_time = time.time() - start_time
print ("ELAPSED TIME: "+str(elapsed_time))
Type for contextual metadata:
StockType: used to define the metadata properties that should be extracted from the data produced in output on a specific port: the extractItemMetadata produces the dictionary (list of dictionaries) to be stored as contextual metadata for each element of the computation.
Types capturing provenance patterns:
ASTGrouped: (Accumulate State Trace Grouped) this type manages a stateful operator with grouping rules.
IntermediateStatefulOut: stateful component which produces distinct but interdependent output.
In [4]:
class StockType(ProvenanceType):
def __init__(self):
ProvenanceType.__init__(self)
self.streammeta=[]
self.count=1
self.addNamespacePrefix("hft","http://hft.eu/ns/#")
def makeUniqueId(self,**kwargs):
#produce the id
id=str(uuid.uuid1())
#Store here the id into the data (type specific):
if 'data' in kwargs:
data=kwargs['data']
#Return
return id
def extractItemMetadata(self,data,port):
try:
metadata=None
self.embed=True
self.streammeta.append({'val':str(data)})
if (self.count%1==0):
metadata=deepcopy(self.streammeta)
self.provon=True
self.streammeta=[]
else:
self.provon=False
self.count+=1
return metadata
except Exception:
self.log("Applying default metadata extraction:"+str(traceback.format_exc()))
self.error=self.error+"Extract Metadata error: "+str(traceback.format_exc())
return super(StockType, self).extractItemMetadata(data,port);
class ASTGrouped(ProvenanceType):
def __init__(self):
ProvenanceType.__init__(self)
def apply_derivation_rule(self,event,voidInvocation,iport=None,oport=None,data=None,metadata=None):
if (event=='write'):
vv=str(abs(make_hash(tuple([self.getInputAt(port=iport,index=x) for x in self.inputconnections[iport]['grouping']]))))
self.setStateDerivations([vv])
if (event=='end_invocation_event' and voidInvocation==False):
self.discardInFlow()
self.discardState()
if (event=='end_invocation_event' and voidInvocation==True):
if data!=None:
vv=str(abs(make_hash(tuple([self.getInputAt(port=iport,index=x) for x in self.inputconnections[iport]['grouping']]))))
self.ignorePastFlow()
self.update_prov_state(vv,data,metadata={"LOOKUP":str(vv)},dep=[vv])
self.discardInFlow()
self.discardState()
class IntermediateStatefulOut(ProvenanceType):
def __init__(self):
ProvenanceType.__init__(self)
def apply_derivation_rule(self,event,voidInvocation,iport=None,oport=None,data=None,metadata=None):
self.ignore_past_flow=False
self.ignore_inputs=False
self.stateful=False
if (event=='write' and oport == self.STATEFUL_PORT):
self.update_prov_state(self.STATEFUL_PORT,data,metadata=metadata)
if (event=='write' and oport != self.STATEFUL_PORT):
self.ignorePastFlow()
if (event=='end_invocation_event' and voidInvocation==False):
self.discardInFlow()
self.discardState()
Once the Provenance types have been defined, these are used to configure a workflow execution to comply with the desired provenance collection requirements. Below we illustrate the framework method and the details of this approach.
In [5]:
#Definition of metadata driven selectivity rules
selrule1 = {"CorrCoef": {
"rules":{
"rho": {
"$gt": 0 }
}
}
}
#Excludes the start component from the trace
selrule2 = {"Start": {
"rules":{
"iterations": {
"$lt": 0 }
}
}
}
#Configuration setup of the provenance execution of the run
prov_config = {
"provone:User": "aspinuso",
"s-prov:description": " correlation of four continuous variables",
"s-prov:workflowName": "CAW",
"s-prov:workflowType": "hft:CorrelationAnalysis",
"s-prov:workflowId": "190341",
"s-prov:description": "CAW Test Case",
"s-prov:save-mode": "service",
"s-prov:componentsType":
{"MaxClique": {"s-prov:type": (IntermediateStatefulOut,),
"s-prov:stateful-port": "graph",
"s-prov:prov-cluster": "hft:StockAnalyser"},
"CorrMatrix": {"s-prov:type": (ASTGrouped, StockType,),
"s-prov:prov-cluster": "hft:StockAnalyser"},
"CorrCoef": {"s-prov:type": (SingleInvocationFlow,),
"s-prov:prov-cluster": "hft:Correlator"}
},
"s-prov:sel-rules": selrule2
}
In [6]:
#Store via service
ProvenanceType.REPOS_URL='http://ec2-18-197-219-251.eu-central-1.compute.amazonaws.com/workflowexecutions/insert'
#Export data lineage via service (REST GET Call on dataid resource /data/<id>/export)
ProvenanceType.PROV_EXPORT_URL='http://ec2-18-197-219-251.eu-central-1.compute.amazonaws.com/workflowexecutions/'
#Store to local path
ProvenanceType.PROV_PATH='./prov-files/'
#Size of the provenance bulk before sent to storage or sensor
ProvenanceType.BULK_SIZE=20
rid=None
def createGraphWithProv():
graph=createWf()
# Ranomdly generated unique identifier for the current run
rid='CAW_JUP_'+getUniqueId()
# Finally, provenance enhanced graph is prepared:
#Initialise provenance storage end associate a Provenance type with specific components:
configure_prov_run(graph,None,
provImpClass=(SingleInvocationFlow,),
username=prov_config["provone:User"],
runId=rid,
input=[{'name':'variables_number','val':variables_number},
{'name':'sampling_rate','val':sampling_rate},
{'name':'batch_size','val':batch_size},
{'name':'iterations','val':iterations}],
description=prov_config["s-prov:description"],
workflowName=prov_config["s-prov:workflowName"],
workflowType=prov_config["s-prov:workflowType"],
workflowId="!23123",
componentsType= prov_config["s-prov:componentsType"],
save_mode=prov_config["s-prov:save-mode"],
sel_rules=prov_config["s-prov:sel-rules"],
)
return (graph,rid)
graph,rid=createGraphWithProv()
display(graph)
In [7]:
#Launch in simple process
start_time = time.time()
process_and_return(graph, input_data)
#python -n 6 dispel4py mpi dispel4py.examples.graph_testing.pipeline_test
elapsed_time = time.time() - start_time
print ("ELAPSED TIME: "+str(elapsed_time))
In [8]:
import xml.etree.ElementTree as ET
print("Extract Trace for run id "+rid)
expurl = urlparse(ProvenanceType.PROV_EXPORT_URL)
connection = httplib.client.HTTPConnection(expurl.netloc)
url= "http://"+expurl.netloc+expurl.path+rid+"/export?format=xml"
print(url)
connection.request(
"GET", url)
response = connection.getresponse()
print("progress: " + str((response.status, response.reason)))
prov1 = ET.fromstring(response.read())
print('PROV TO EMBED:')
print(str(prov1))
In [9]:
import prov
from IPython.display import Image
import io
from prov.model import ProvDocument, ProvBundle, ProvException, first, Literal
from prov.dot import prov_to_dot
def provURLToPNG(xml,format):
xml_doc = io.BytesIO(urllib.request.urlopen(xml).read())
doc=ProvDocument.deserialize(xml_doc,format="xml")
dot = prov_to_dot(doc)
if format=="png":
dot.write_png('PROV.png')
return 'PROV.png'
if format=="svg":
dot.write_svg('PROV.svg')
return 'PROV.svg'
expurl = urlparse(ProvenanceType.PROV_EXPORT_URL)
connection = httplib.client.HTTPConnection(expurl.netloc)
url= "http://"+expurl.netloc+expurl.path+rid+"/export?all=true"
print(url)
png_content=provURLToPNG(url,"png")
Image(png_content)
Out[9]:
In [10]:
class ProvenanceSensorToService(ProvenanceRecorder):
def __init__(self, name='ProvenanceRecorderToService', toW3C=False):
ProvenanceRecorder.__init__(self)
self.name = name
self.numprocesses=2
self.convertToW3C = toW3C
def _postprocess(self):
self.connection.close()
def _preprocess(self):
self.provurl = urlparse(ProvenanceRecorder.REPOS_URL)
self.connection = httplib.HTTPConnection(
self.provurl.netloc)
def sendToService(self,prov):
params = urllib.urlencode({'prov': ujson.dumps(prov)})
headers = {
"Content-type": "application/x-www-form-urlencoded",
"Accept": "application/json"}
self.connection.request(
"POST",
self.provurl.path,
params,
headers)
response = self.connection.getresponse()
self.log("Postprocress: " +
str((response.status, response.reason, response)))
self.connection.close()
def process(self, inputs):
try:
for x in inputs:
prov = inputs[x]
if "_d4p" in prov:
prov = prov["_d4p"]
elif "provenance" in prov:
prov = prov["provenance"]
#self.log(prov)
self.sendToService(prov)
except:
self.log(traceback.format_exc())
In [ ]:
class MyProvenanceSensorWithFeedback(ProvenanceRecorder):
def __init__(self, toW3C=False):
ProvenanceRecorder.__init__(self)
self.convertToW3C = toW3C
self.bulk = []
self.timestamp = datetime.datetime.utcnow()
def _preprocess(self):
self.provurl = urlparse(ProvenanceRecorder.REPOS_URL)
self.connection = httplib.HTTPConnection(
self.provurl.netloc)
def postprocess(self):
self.connection.close()
def _process(self, inputs):
prov = None
for x in inputs:
prov = inputs[x]
out = None
if isinstance(prov, list) and "data" in prov[0]:
prov = prov[0]["data"]
if self.convertToW3C:
out = toW3Cprov(prov)
else:
out = prov
self.write(self.porttopemap[prov['name']], "FEEDBACK MESSAGGE FROM RECORDER")
self.bulk.append(out)
params = urllib.urlencode({'prov': json.dumps(self.bulk)})
headers = {
"Content-type": "application/x-www-form-urlencoded",
"Accept": "application/json"}
self.connection.request(
"POST", self.provurl.path, params, headers)
response = self.connection.getresponse()
self.log("progress: " + str((response.status, response.reason,
response, response.read())))
return None