Correlation Analysis Workflow with Active Provenance in dispel4py:

Correlation Analysis Workflow (CAW): Description and Components


This page implements a workflow that analyses the correlation between a configurable number of variables. It produces a correlation matrix and a graph given a minimum correlation threshold. Eventually, it finds the graph max cliques, which show all the variables that are together correlated above the threshold. The workflow can run over multipe iterations, with parametrisable sampling-rate and length of the variable's batches.

The workflow specification and its parametrisation are inspired by the following research paper: https://www.cs.ubc.ca/~hoos/Publ/RosEtAl07.pdf

Components


  • 1 - Start: Root node of the graph. It broadcasts the sampling-iteration number to the Source components
  • 2 - Source: Produces random numbers from 0,100 at a specified sampling-rate and organises them in batches
  • 3 - CorrCoef: Calculates the Pearson's correlation coefficient of the batches coming from two sourcs
  • 4 - CorrMatrix: Produces and visualises the correlation matrix for all sources, for each sampling-iteration
  • 5 - MaxClique: Transforms the correlation matrix into a graph, according to a correlation minimum threshold and computes the graph’s max cliques

  • 
    
    In [1]:
    %matplotlib inline
    
    
    from dispel4py.workflow_graph import WorkflowGraph 
    from dispel4py.provenance import *
    from dispel4py.new.processor  import *
    import time
    import random
    import numpy
    import traceback 
    from dispel4py.base import create_iterative_chain, GenericPE, ConsumerPE, IterativePE, SimpleFunctionPE
    from dispel4py.new.simple_process import process_and_return
    from dispel4py.visualisation import display
    import IPython
    import pandas as pd
    import seaborn as sns
    import matplotlib.pyplot as plt
    from scipy.stats.stats import pearsonr 
    import networkx as nx
    from itertools import combinations
    
    sns.set(style="white")
    
    
    class Start(GenericPE):
    
        def __init__(self):
            GenericPE.__init__(self)
            self._add_input('iterations')
            self._add_output('output')
            #self.prov_cluster="myne"
        
        def _process(self,inputs):
            
            if 'iterations' in inputs:
                inp=inputs['iterations']
                 
                self.write('output',inp,metadata={'iterations':inp})
                
            #Uncomment this line to associate this PE to the mycluster provenance-cluster 
            #self.prov_cluster ='mycluster'
    
    class Source(GenericPE):
    
        def __init__(self,sr,index,batchsize):
            GenericPE.__init__(self)
            self._add_input('iterations')
            self._add_output('output')
            self.sr=sr
            self.var_index=index
            self.batchsize=batchsize
            #self.prov_cluster="myne"
             
            self.parameters={'sampling_rate':sr,'batchsize':batchsize}
            
            #Uncomment this line to associate this PE to the mycluster provenance-cluster 
            #self.prov_cluster ='mycluster'
            
        
        def _process(self,inputs):
             
            if 'iterations' in inputs:
                iteration=inputs['iterations']
            
            
            batch=[]
            it=1
            #Streams out values at 1/self.sr sampling rate, until iteration>0
            while (it<=iteration):
                while (len(batch)<self.batchsize):
                    val=random.uniform(0,100)
                    time.sleep(1/self.sr)
                    batch.append(val)
                    
                self.write('output',(it,self.var_index,batch),metadata={'var':self.var_index,'iteration':it,'batch':batch})
                batch=[]
                it+=1
            
    
    class MaxClique(GenericPE):
    
        def __init__(self,threshold):
            GenericPE.__init__(self)
            self._add_input('input')
            self._add_output('graph')
            self._add_output('clique')
            self.threshold=threshold
            #self.prov_cluster="myne"
             
            self.parameters={'threshold':threshold}
            
                    
            #Uncomment this line to associate this PE to the mycluster provenance-cluster 
            #self.prov_cluster ='mycluster'
            
        
        def _process(self,inputs):
             
            if 'input' in inputs:
                matrix=inputs['input'][0]
                iteration=inputs['input'][1]
            
            
             
           
            
            low_values_indices = matrix < self.threshold  # Where values are low
            matrix[low_values_indices] = 0 
            #plt.figure('graph_'+str(iteration))
                    
             
            
            H = nx.from_numpy_matrix(matrix)
            fig = plt.figure('graph_'+str(iteration))
            text = "Iteration "+str(iteration)+" "+"graph"
             
            labels = {i : i for i in H.nodes()}
            pos = nx.circular_layout(H)
            nx.draw_circular(H)
            nx.draw_networkx_labels(H, pos, labels, font_size=15)
            fig.text(.1,.1,text)
            self.write('graph',matrix,metadata={'graph':str(matrix),'batch':iteration})
           
             
            
           # labels = {i : i for i in H.nodes()}
           # pos = nx.circular_layout(H)
           # nx.draw_circular(H)
           # nx.draw_networkx_labels(H, pos, labels, font_size=15)
           
            
            cliques = list(nx.find_cliques(H))
            
            fign=0
            maxcnumber=0
            maxclist=[]
            
            for nodes in cliques:
                if (len(nodes)>maxcnumber):
                    maxcnumber=len(nodes)
                    
            for nodes in cliques:
                if (len(nodes)==maxcnumber):
                    maxclist.append(nodes)
    
            for nodes in maxclist:    
                edges = combinations(nodes, 2)
                C = nx.Graph()
                C.add_nodes_from(nodes)
                C.add_edges_from(edges)
                fig = plt.figure('clique_'+str(iteration)+str(fign))
                text = "Iteration "+str(iteration)+" "+"clique "+str(fign)
                fign+=1
                labels = {i : i for i in C.nodes()}
                pos = nx.circular_layout(C)
                nx.draw_circular(C)
                nx.draw_networkx_labels(C, pos, labels, font_size=15)
                fig.text(.1,.1,text)
                self.write('clique',cliques,metadata={'clique':str(nodes),'iteration':iteration, 'order':maxcnumber,'prov:type':"hft:products"}, location="file://cliques/")
        
                  
             
            
            
    
    class CorrMatrix(GenericPE):
    
        def __init__(self,variables_number):
            GenericPE.__init__(self)
            self._add_input('input',grouping=[0]) 
            self._add_output('output')
            self.size=variables_number
            self.parameters={'variables_number':variables_number}
            self.data={}
             
            
            #Uncomment this line to associate this PE to the mycluster provenance-cluster 
            #self.prov_cluster ='mycluster'self.prov_cluster='mycluster'
                
        def _process(self,inputs):
            for x in inputs:
                
                if inputs[x][0] not in self.data:
                    #prepares the data to visualise the xcor matrix of a specific batch number.
                    self.data[inputs[x][0]]={}
                    self.data[inputs[x][0]]['matrix']=numpy.identity(self.size)
                    self.data[inputs[x][0]]['ro_count']=0
                
                if (inputs[x][1][0]>inputs[x][1][1]):
                    self.data[inputs[x][0]]['matrix'][inputs[x][1][0]-1,inputs[x][1][1]-1]=inputs[x][2]
                if (inputs[x][1][0]<inputs[x][1][1]):
                    self.data[inputs[x][0]]['matrix'][inputs[x][1][1]-1,inputs[x][1][0]-1]=inputs[x][2]
                    
                self.data[inputs[x][0]]['matrix'][inputs[x][1][0]-1,inputs[x][1][1]-1]=inputs[x][2]
                self.data[inputs[x][0]]['ro_count']+=1
                  
                if self.data[inputs[x][0]]['ro_count']==(self.size*(self.size-1))/2:
                    matrix=self.data[inputs[x][0]]['matrix']
                    
                    d = pd.DataFrame(data=matrix,
                     columns=range(0,self.size),index=range(0,self.size))
                    
                    mask = numpy.zeros_like(d, dtype=numpy.bool)
                    mask[numpy.triu_indices_from(mask)] = True
    
                    # Set up the matplotlib figure
                    f, ax = plt.subplots(figsize=(11, 9))
    
                    # Generate a custom diverging colormap
                    cmap = sns.diverging_palette(220, 10, as_cmap=True)
    
                    # Draw the heatmap with the mask and correct aspect ratio
                    sns.heatmap(d, mask=mask, cmap=cmap, vmax=1,
                        square=True,
                        linewidths=.5, cbar_kws={"shrink": .5}, ax=ax)
                    
                    plt.show()   
                    self.log('\r\n'+str(matrix))
                    self.write('output',(matrix,inputs[x][0]),metadata={'matrix':str(d),'iteration':str(inputs[x][0])})
                    ##dep=['iter_'+str(inputs[x][0])])
    
    
                
    class CorrCoef(GenericPE):
    
        def __init__(self):
            GenericPE.__init__(self)
            #self._add_input('input1',grouping=[0])
            #self._add_input('input2',grouping=[0])
            self._add_output('output')
            self.data={}
            
            
             
            
        def _process(self, inputs):
            index=None
            val=None
                  
            for x in inputs:
                if inputs[x][0] not in self.data:
                    self.data[inputs[x][0]]=[]
                    
                for y in self.data[inputs[x][0]]:
                     
                    ro=numpy.corrcoef(y[2],inputs[x][2])
                     
                    self.write('output',(inputs[x][0],(y[1],inputs[x][1]),ro[0][1]),metadata={'iteration':inputs[x][0],'vars':str(y[1])+"_"+str(inputs[x][1]),'rho':ro[0][1]},dep=['var_'+str(y[1])+"_"+str(y[0])])
                     
                
                #appends var_index and value
                self.data[inputs[x][0]].append(inputs[x])
                #self.log(self.data[inputs[x][0]])
                self.update_prov_state('var_'+str(inputs[x][1])+"_"+str(inputs[x][0]),None,metadata={'var_'+str(inputs[x][1]):inputs[x][2]}, ignore_inputs=True)
    

    Definition of the workflow

    We define now the workflow and its parameterisation.

    
    
    In [2]:
    variables_number=3
    sampling_rate=100
    batch_size=3
    iterations=2
    
    input_data = {"Start": [{"iterations": iterations}]}
          
    # Instantiates the Workflow Components  
    # and generates the graph based on parameters
    
    def createWf():
        graph = WorkflowGraph()
        mat=CorrMatrix(variables_number)
        mat.prov_cluster='record2'
        mc = MaxClique(-0.5)
        mc.prov_cluster='record0'
        start=Start()
        start.prov_cluster='record0'
        sources={}
         
    
        
        
        cc=CorrCoef()
        cc.prov_cluster='record1'
        stock=['hft:NASDAQ','hft:MIBTEL','hft:DOWJ']
          
        for i in range(1,variables_number+1):
            sources[i] = Source(sampling_rate,i,batch_size)
            sources[i].prov_cluster=stock[i%len(stock)-1]
             
             
            sources[i].numprocesses=1
             
    
        for h in range(1,variables_number+1):
            cc._add_input('input'+'_'+str(h+1),grouping=[0])
            graph.connect(start,'output',sources[h],'iterations')
            graph.connect(sources[h],'output',cc,'input'+'_'+str(h+1))
            
        
        graph.connect(cc,'output',mat,'input')
        graph.connect(mat,'output',mc,'input')
        
            
      
        return graph
    

    Execution without provenance

    The following script executes the workflow in single-process mode with no provenance

    
    
    In [3]:
    #Launch in simple process
    print ("Preparing for: "+str(iterations)+" projections" )
    graph=createWf()
    
    start_time = time.time()
    
    #Uncommend to run the workflow without provenance
    #process_and_return(graph, input_data)
    elapsed_time = time.time() - start_time
    print ("ELAPSED TIME: "+str(elapsed_time))
    
    
    
    
    Preparing for: 2 projections
    ELAPSED TIME: 4.1961669921875e-05
    

    Provenance Types

    Type for contextual metadata:

    StockType: used to define the metadata properties that should be extracted from the data produced in output on a specific port: the extractItemMetadata produces the dictionary (list of dictionaries) to be stored as contextual metadata for each element of the computation.

    Types capturing provenance patterns:

    ASTGrouped: (Accumulate State Trace Grouped) this type manages a stateful operator with grouping rules.

    IntermediateStatefulOut: stateful component which produces distinct but interdependent output.

    
    
    In [4]:
    class StockType(ProvenanceType):
        def __init__(self):
            ProvenanceType.__init__(self)
            self.streammeta=[]
            self.count=1
            self.addNamespacePrefix("hft","http://hft.eu/ns/#")
        
        
        
            def makeUniqueId(self,**kwargs):
    
                #produce the id
                id=str(uuid.uuid1())
    
                #Store here the id into the data (type specific):
                if 'data' in kwargs:
                    data=kwargs['data']
    
                #Return
                return id
        
        def extractItemMetadata(self,data,port):
            try:
                metadata=None
                self.embed=True
                self.streammeta.append({'val':str(data)})
                 
                if (self.count%1==0):
                    
                    metadata=deepcopy(self.streammeta)
                    self.provon=True
                    self.streammeta=[]
                else:
                    self.provon=False
                
                self.count+=1
                return metadata
                    
                     
    
            except Exception:
                self.log("Applying default metadata extraction:"+str(traceback.format_exc()))
                self.error=self.error+"Extract Metadata error: "+str(traceback.format_exc())
                return super(StockType, self).extractItemMetadata(data,port);
                
    
                
    class ASTGrouped(ProvenanceType):
        def __init__(self):
            ProvenanceType.__init__(self)
            
      
        def apply_derivation_rule(self,event,voidInvocation,iport=None,oport=None,data=None,metadata=None):
           
             
    
            
            if (event=='write'):
                vv=str(abs(make_hash(tuple([self.getInputAt(port=iport,index=x) for x in self.inputconnections[iport]['grouping']]))))
                self.setStateDerivations([vv])
    
            if (event=='end_invocation_event' and voidInvocation==False):
                 self.discardInFlow()
                 self.discardState()
    
            if (event=='end_invocation_event' and voidInvocation==True):
                
                if data!=None:
                    
                    vv=str(abs(make_hash(tuple([self.getInputAt(port=iport,index=x) for x in self.inputconnections[iport]['grouping']]))))
                    self.ignorePastFlow()
                    self.update_prov_state(vv,data,metadata={"LOOKUP":str(vv)},dep=[vv])
                    self.discardInFlow()
                    self.discardState()
                    
    
    class IntermediateStatefulOut(ProvenanceType):
         
        def __init__(self):
            ProvenanceType.__init__(self)
            
        
        def apply_derivation_rule(self,event,voidInvocation,iport=None,oport=None,data=None,metadata=None):
             
            self.ignore_past_flow=False
            self.ignore_inputs=False
            self.stateful=False
            
            if (event=='write' and oport == self.STATEFUL_PORT):
                self.update_prov_state(self.STATEFUL_PORT,data,metadata=metadata)
                 
    
            if (event=='write' and oport != self.STATEFUL_PORT):
                self.ignorePastFlow()
    
            if (event=='end_invocation_event' and voidInvocation==False):
                
                self.discardInFlow()
                self.discardState()
    

    Prepare the workfow for provenance tracking

    Once the Provenance types have been defined, these are used to configure a workflow execution to comply with the desired provenance collection requirements. Below we illustrate the framework method and the details of this approach.

    • configure_prov_run With this method, the users of the workflow can prepare their run for provenance by indicating which types to apply to each component. Users can also chose where to store the metadata, locally to the file system or to a remote service. These operations can be performed in bulks, with different impacts on the overall overhead and on the experienced rapidity of the access of the lineage information. Additional details on the proposed remote provenance storage and access service will be provided in Chapter V. Finally, also general information about the attribution of the run, such as username, run_id, description, workflow_name, workflow_id are captured and included within the provenance traces.
    • Selectivity-Rules (Advanced) Users can tune the scale of the records produced by indicating in the above method a set of sel-rules for every component.This functionality allows users to specify rules to control the data-driven production of the provenance declaratively. The approach takes advantage of the contextualisation applied by the provenance types, which extract domain and experimental metadata, and evaluates their value against simple sel-rule of this kind:
    
    
    In [5]:
    #Definition of metadata driven selectivity rules
    
    selrule1 = {"CorrCoef": { 
                             "rules":{ 
                                     "rho": {
                                                "$gt": 0 }
                                    }
                            }
               }
    
    
    #Excludes the start component from the trace
    selrule2 = {"Start": { 
                             "rules":{ 
                                     "iterations": {
                                                "$lt": 0 }
                                    }
                            }
               }
    
    
    #Configuration setup of the provenance execution of the run
    prov_config = {
         "provone:User": "aspinuso",
         "s-prov:description": " correlation of four continuous variables",
         "s-prov:workflowName": "CAW",
         "s-prov:workflowType": "hft:CorrelationAnalysis",
         "s-prov:workflowId": "190341",
         "s-prov:description": "CAW Test Case",
         "s-prov:save-mode": "service",
         "s-prov:componentsType":
         {"MaxClique": {"s-prov:type": (IntermediateStatefulOut,),
                        "s-prov:stateful-port": "graph",
                        "s-prov:prov-cluster": "hft:StockAnalyser"},
    
          "CorrMatrix": {"s-prov:type": (ASTGrouped, StockType,),
                         "s-prov:prov-cluster": "hft:StockAnalyser"},
    
          "CorrCoef": {"s-prov:type": (SingleInvocationFlow,),
                       "s-prov:prov-cluster": "hft:Correlator"}
          },
          "s-prov:sel-rules": selrule2
    }
    
    
    
    In [6]:
    #Store via service
    ProvenanceType.REPOS_URL='http://ec2-18-197-219-251.eu-central-1.compute.amazonaws.com/workflowexecutions/insert'
    
    #Export data lineage via service (REST GET Call on dataid resource /data/<id>/export)
    ProvenanceType.PROV_EXPORT_URL='http://ec2-18-197-219-251.eu-central-1.compute.amazonaws.com/workflowexecutions/'
    
    #Store to local path
    ProvenanceType.PROV_PATH='./prov-files/'
    
    #Size of the provenance bulk before sent to storage or sensor
    ProvenanceType.BULK_SIZE=20
    
    rid=None
    
    def createGraphWithProv():
        
        graph=createWf()
        
        # Ranomdly generated unique identifier for the current run
        rid='CAW_JUP_'+getUniqueId()
    
        
        # Finally, provenance enhanced graph is prepared:   
        #Initialise provenance storage end associate a Provenance type with specific components:
        configure_prov_run(graph,None,
                           provImpClass=(SingleInvocationFlow,),
                           username=prov_config["provone:User"],
                           runId=rid,
                           input=[{'name':'variables_number','val':variables_number},
                                  {'name':'sampling_rate','val':sampling_rate},
                                  {'name':'batch_size','val':batch_size},
                                  {'name':'iterations','val':iterations}],
                           description=prov_config["s-prov:description"],
                           workflowName=prov_config["s-prov:workflowName"],
                           workflowType=prov_config["s-prov:workflowType"],
                           workflowId="!23123",
                           componentsType= prov_config["s-prov:componentsType"],
                           save_mode=prov_config["s-prov:save-mode"],
                           sel_rules=prov_config["s-prov:sel-rules"],
                           
                          )
        return (graph,rid)
    
    
    graph,rid=createGraphWithProv()
    
    display(graph)
    
    
    
    
    Change grouping implementation 
    CorrCoef Original base class: (<class 'dispel4py.core.GenericPE'>,)
     New type: (<class 'dispel4py.provenance.SingleInvocationFlow'>, <class '__main__.CorrCoef'>)
    CorrMatrix Original base class: (<class 'dispel4py.core.GenericPE'>,)
     New type: (<class '__main__.ASTGrouped'>, <class '__main__.StockType'>, <class '__main__.CorrMatrix'>)
    MaxClique Original base class: (<class 'dispel4py.core.GenericPE'>,)
     New type: (<class '__main__.IntermediateStatefulOut'>, <class '__main__.MaxClique'>)
    Source Original base class: (<class 'dispel4py.core.GenericPE'>,)
    (<class 'dispel4py.provenance.SingleInvocationFlow'>,)
     New type: (<class 'dispel4py.provenance.SingleInvocationFlow'>, <class '__main__.Source'>)
    Source Original base class: (<class 'dispel4py.core.GenericPE'>,)
    (<class 'dispel4py.provenance.SingleInvocationFlow'>,)
     New type: (<class 'dispel4py.provenance.SingleInvocationFlow'>, <class '__main__.Source'>)
    Source Original base class: (<class 'dispel4py.core.GenericPE'>,)
    (<class 'dispel4py.provenance.SingleInvocationFlow'>,)
     New type: (<class 'dispel4py.provenance.SingleInvocationFlow'>, <class '__main__.Source'>)
    Start Original base class: (<class 'dispel4py.core.GenericPE'>,)
    (<class 'dispel4py.provenance.SingleInvocationFlow'>,)
    Start {'rules': {'iterations': {'$lt': 0}}}
     New type: (<class 'dispel4py.provenance.SingleInvocationFlow'>, <class '__main__.Start'>)
    Inputs: {'NewWorkflowRun': [{'input': 'None'}]}
    NewWorkflowRun14: BUILDING INITIAL DERIVATION
    NewWorkflowRun14: STORING WORKFLOW RUN METADATA
    NewWorkflowRun14: Postprocess: (200, 'OK', b'{"inserts": [{"updatedExisting": false, "nModified": 0, "ok": 1.0, "upserted": "CAW_JUP_orfeus-as-11776-426fcf74-2ed2-11e9-a23c-4a0005199ad0", "n": 1}], "success": true}')
    SimplePE: Processed 1 iteration.
    Outputs: {}
    

    Execution with provenance

    The following script executes the workflow in single-process mode

    
    
    In [7]:
    #Launch in simple process
    start_time = time.time()
    process_and_return(graph, input_data)
    #python -n 6 dispel4py mpi dispel4py.examples.graph_testing.pipeline_test
    elapsed_time = time.time() - start_time
    print ("ELAPSED TIME: "+str(elapsed_time))
    
    
    
    
    Start7: BUILDING INITIAL DERIVATION
    Start7: Checking Selectivity-Rules: {'rules': {'iterations': {'$lt': 0}}}
    Start7: 2
    CorrCoef9: <class 'NoneType'>
    CorrCoef9: <class 'NoneType'>
    CorrCoef9: <class 'NoneType'>
    CorrCoef9: <class 'NoneType'>
    CorrCoef9: <class 'NoneType'>
    CorrCoef9: <class 'NoneType'>
    CorrMatrix12: <class 'tuple'>
    CorrMatrix12: <class 'tuple'>
    CorrMatrix12: <class 'tuple'>
    
    CorrMatrix12: 
    [[ 1.         -0.02529936 -0.61088783]
     [-0.02529936  1.          0.80691882]
     [-0.61088783  0.80691882  1.        ]]
    CorrMatrix12: <class 'tuple'>
    
    CorrMatrix12: 
    [[ 1.         -0.26333523 -0.79413709]
     [-0.26333523  1.         -0.37716388]
     [-0.79413709 -0.37716388  1.        ]]
    MaxClique13: <class 'numpy.ndarray'>
    MaxClique13: <class 'numpy.ndarray'>
    
    /anaconda3/lib/python3.6/site-packages/networkx/drawing/nx_pylab.py:611: MatplotlibDeprecationWarning: isinstance(..., numbers.Number)
      if cb.is_numlike(alpha):
    /anaconda3/lib/python3.6/site-packages/networkx/drawing/nx_pylab.py:611: MatplotlibDeprecationWarning: isinstance(..., numbers.Number)
      if cb.is_numlike(alpha):
    /anaconda3/lib/python3.6/site-packages/networkx/drawing/nx_pylab.py:611: MatplotlibDeprecationWarning: isinstance(..., numbers.Number)
      if cb.is_numlike(alpha):
    /anaconda3/lib/python3.6/site-packages/networkx/drawing/nx_pylab.py:611: MatplotlibDeprecationWarning: isinstance(..., numbers.Number)
      if cb.is_numlike(alpha):
    /anaconda3/lib/python3.6/site-packages/networkx/drawing/nx_pylab.py:611: MatplotlibDeprecationWarning: isinstance(..., numbers.Number)
      if cb.is_numlike(alpha):
    /anaconda3/lib/python3.6/site-packages/networkx/drawing/nx_pylab.py:611: MatplotlibDeprecationWarning: isinstance(..., numbers.Number)
      if cb.is_numlike(alpha):
    
    Source8: Postprocess: (200, 'OK', b'{"inserts": ["Source8_write_orfeus-as-11776-44f4bb74-2ed2-11e9-9c2c-4a0005199ad0", "Source8_write_orfeus-as-11776-44f9bc5a-2ed2-11e9-bcbd-4a0005199ad0"], "success": true}')
    Source10: Postprocess: (200, 'OK', b'{"inserts": ["Source10_write_orfeus-as-11776-44feaca6-2ed2-11e9-babe-4a0005199ad0", "Source10_write_orfeus-as-11776-4503fb28-2ed2-11e9-bf86-4a0005199ad0"], "success": true}')
    Source11: Postprocess: (200, 'OK', b'{"inserts": ["Source11_write_orfeus-as-11776-4509686c-2ed2-11e9-b902-4a0005199ad0", "Source11_write_orfeus-as-11776-450eeddc-2ed2-11e9-9e1f-4a0005199ad0"], "success": true}')
    CorrCoef9: Postprocess: (200, 'OK', b'{"inserts": ["CorrCoef9_stateful_orfeus-as-11776-450f0a7e-2ed2-11e9-b9a2-4a0005199ad0", "CorrCoef9_stateful_orfeus-as-11776-450f1a28-2ed2-11e9-9d9e-4a0005199ad0", "CorrCoef9_write_orfeus-as-11776-450fe374-2ed2-11e9-9c7c-4a0005199ad0", "CorrCoef9_stateful_orfeus-as-11776-45100b68-2ed2-11e9-b86c-4a0005199ad0", "CorrCoef9_write_orfeus-as-11776-45102846-2ed2-11e9-92c2-4a0005199ad0", "CorrCoef9_stateful_orfeus-as-11776-451044fa-2ed2-11e9-bbd3-4a0005199ad0", "CorrCoef9_write_orfeus-as-11776-45105c4c-2ed2-11e9-b57b-4a0005199ad0", "CorrCoef9_write_orfeus-as-11776-45107258-2ed2-11e9-8304-4a0005199ad0", "CorrCoef9_stateful_orfeus-as-11776-451087d2-2ed2-11e9-bf8a-4a0005199ad0", "CorrCoef9_write_orfeus-as-11776-45109a62-2ed2-11e9-81f1-4a0005199ad0", "CorrCoef9_write_orfeus-as-11776-4510a992-2ed2-11e9-9aaa-4a0005199ad0", "CorrCoef9_stateful_orfeus-as-11776-4510bf22-2ed2-11e9-bd5c-4a0005199ad0"], "success": true}')
    CorrMatrix12: Postprocess: (200, 'OK', b'{"inserts": ["CorrMatrix12_write_orfeus-as-11776-45391562-2ed2-11e9-9fde-4a0005199ad0", "CorrMatrix12_write_orfeus-as-11776-4563cd3e-2ed2-11e9-a412-4a0005199ad0"], "success": true}')
    MaxClique13: Postprocess: (200, 'OK', b'{"inserts": ["MaxClique13_write_orfeus-as-11776-4568255a-2ed2-11e9-999b-4a0005199ad0", "MaxClique13_write_orfeus-as-11776-456eb24c-2ed2-11e9-a9b8-4a0005199ad0", "MaxClique13_write_orfeus-as-11776-45713712-2ed2-11e9-9539-4a0005199ad0", "MaxClique13_write_orfeus-as-11776-457471e8-2ed2-11e9-a40e-4a0005199ad0", "MaxClique13_write_orfeus-as-11776-45778bd0-2ed2-11e9-a1a0-4a0005199ad0", "MaxClique13_write_orfeus-as-11776-4579fe24-2ed2-11e9-b235-4a0005199ad0"], "success": true}')
    SimplePE: Processed 1 iteration.
    ELAPSED TIME: 2.326709032058716
    

    Provenance trace extraction from S-ProvFlow

    The following instructions connect to the online S-ProvFlow WEB-API and downloads the PROV trace for a single runid

    
    
    In [8]:
    import xml.etree.ElementTree as ET
    
    print("Extract Trace for run id "+rid)
    
    expurl = urlparse(ProvenanceType.PROV_EXPORT_URL)
    connection = httplib.client.HTTPConnection(expurl.netloc)
    url= "http://"+expurl.netloc+expurl.path+rid+"/export?format=xml"
    print(url)
    connection.request(
                    "GET", url)
    response = connection.getresponse()
      
    print("progress: " + str((response.status, response.reason)))
    prov1 = ET.fromstring(response.read())
    print('PROV TO EMBED:')
    print(str(prov1))
    
    
    
    
    Extract Trace for run id CAW_JUP_orfeus-as-11776-426fcf74-2ed2-11e9-a23c-4a0005199ad0
    http://ec2-18-197-219-251.eu-central-1.compute.amazonaws.com/workflowexecutions/CAW_JUP_orfeus-as-11776-426fcf74-2ed2-11e9-a23c-4a0005199ad0/export?format=xml
    progress: (200, 'OK')
    PROV TO EMBED:
    <Element '{http://www.w3.org/ns/prov#}document' at 0x1a1a9239a8>
    

    2.3 - Visualise Provenance Trace

    Read the if of the output to locate the provenance trace on the remote service

    2.3.1 Visualise in S-ProvFlow

    The following link opens a local installation of the S-ProvFlow System GUI. Enter your username and open the run with the RunID shown above

    http://127.0.0.1:8180/provenance-explorer/html/view.jsp

    
    
    In [9]:
    import prov
    from IPython.display import Image
    import io
    from prov.model import ProvDocument, ProvBundle, ProvException, first, Literal
    from prov.dot import prov_to_dot
    
    def provURLToPNG(xml,format):
         
        xml_doc = io.BytesIO(urllib.request.urlopen(xml).read())
        doc=ProvDocument.deserialize(xml_doc,format="xml")
        dot = prov_to_dot(doc)
        
        if format=="png":
            dot.write_png('PROV.png')
            return 'PROV.png' 
        if format=="svg":
            dot.write_svg('PROV.svg')
            return 'PROV.svg'
        
        
    
    
    
    
    
    expurl = urlparse(ProvenanceType.PROV_EXPORT_URL)
    connection = httplib.client.HTTPConnection(expurl.netloc)
    url= "http://"+expurl.netloc+expurl.path+rid+"/export?all=true"
    print(url)
    png_content=provURLToPNG(url,"png")
    
    Image(png_content)
    
    
    
    
    http://ec2-18-197-219-251.eu-central-1.compute.amazonaws.com/workflowexecutions/CAW_JUP_orfeus-as-11776-426fcf74-2ed2-11e9-a23c-4a0005199ad0/export?all=true
    
    Out[9]:

    Developing Provenance Sensors

    The Class below show a sample ProvenanceSensorToService and a slightlty more advanced one that allows for feedback.

    ProvenanceSensorToService

    Recieves traces from the PEs and sends them out to an exteranal provenance store.

    
    
    In [10]:
    class ProvenanceSensorToService(ProvenanceRecorder):
    
        def __init__(self, name='ProvenanceRecorderToService', toW3C=False):
            ProvenanceRecorder.__init__(self)
            self.name = name
            self.numprocesses=2
            self.convertToW3C = toW3C
             
        def _postprocess(self):
            self.connection.close()
            
        def _preprocess(self):
            self.provurl = urlparse(ProvenanceRecorder.REPOS_URL)
            self.connection = httplib.HTTPConnection(
                self.provurl.netloc)
            
        def sendToService(self,prov):
            params = urllib.urlencode({'prov': ujson.dumps(prov)})
            headers = {
                        "Content-type": "application/x-www-form-urlencoded",
                        "Accept": "application/json"}
            self.connection.request(
                        "POST",
                        self.provurl.path,
                        params,
                        headers)
    
            response = self.connection.getresponse()
            self.log("Postprocress: " +
                     str((response.status, response.reason, response)))
            self.connection.close()
            
        def process(self, inputs):
            try:
                 
                for x in inputs:
                    
                    prov = inputs[x]
                    
                    
                    if "_d4p" in prov:
                        prov = prov["_d4p"]
                    elif "provenance" in prov:
                        prov = prov["provenance"]
                        
                    #self.log(prov)
                    self.sendToService(prov)
                    
                    
            except:
                self.log(traceback.format_exc())
    

    Provenance Sensor with Feedback - MyProvenanceSensorWithFeedback

    Recieves traces from the PEs and reads its content. Depending from the 'name' of the PE sending the lineage, feedbacks are prepared and sent back.

    
    
    In [ ]:
    class MyProvenanceSensorWithFeedback(ProvenanceRecorder):
    
        def __init__(self, toW3C=False):
            ProvenanceRecorder.__init__(self)
            self.convertToW3C = toW3C
            self.bulk = []
            self.timestamp = datetime.datetime.utcnow()
    
        def _preprocess(self):
            self.provurl = urlparse(ProvenanceRecorder.REPOS_URL)
    
            self.connection = httplib.HTTPConnection(
                self.provurl.netloc)
    
        def postprocess(self):
            self.connection.close()
            
        def _process(self, inputs):
            prov = None
            for x in inputs:
                prov = inputs[x]
            out = None
            if isinstance(prov, list) and "data" in prov[0]:
                prov = prov[0]["data"]
    
            if self.convertToW3C:
                out = toW3Cprov(prov)
            else:
                out = prov
    
                
                
            self.write(self.porttopemap[prov['name']], "FEEDBACK MESSAGGE FROM RECORDER")
    
            self.bulk.append(out)
            params = urllib.urlencode({'prov': json.dumps(self.bulk)})
            headers = {
                "Content-type": "application/x-www-form-urlencoded",
                "Accept": "application/json"}
            self.connection.request(
                "POST", self.provurl.path, params, headers)
            response = self.connection.getresponse()
            self.log("progress: " + str((response.status, response.reason,
                                             response, response.read())))
            
    
            return None