The following "Live" notebook demonstrates a simple workflow implemented with a data-intensive processing library (dispel4py), that has been extended with a configurable and programmable provenance tracking framework (Active provenance).
SWOT form:
https://docs.google.com/presentation/d/10xlRYytR7NB9iC19T29BD-rW77ZAtnjtlukMJDP_MIs/edit?usp=sharing
In [1]:
import xarray
import netCDF4
import json
from dispel4py.workflow_graph import WorkflowGraph
from dispel4py.provenance import *
from collections import OrderedDict
import time
import random
from dispel4py.base import create_iterative_chain, ConsumerPE, IterativePE, SimpleFunctionPE
import matplotlib.pyplot as plt
import traceback
import numpy as np
from pprint import pprint
Simple Workflow, xarray in xarray out. The generic processing elements are defined below. the GenericPE bellongs to the dispel4py framework. It allows data-objects to be passed as inputs and outputs. The Components are linked and visualised via the workflow_graph module.
In [2]:
class Read(GenericPE):
def __init__(self):
GenericPE.__init__(self)
self._add_input('input')
self._add_output('xarray')
self.count=0
def _process(self,inputs):
self.log('Read_Process')
self.log(inputs)
inputLocation = inputs['input'][0]
ds = xarray.open_dataset( inputLocation )
self.write( 'xarray' , (ds, self.count) , location=inputLocation )
self.count+=1
class Write(GenericPE):
def __init__(self):
GenericPE.__init__(self)
self._add_input('input')
self._add_output('location')
self.count=0
def _process(self,inputs):
self.log('Write_Function')
outputLocation = "data/new_"+str(self.count)+".nc"
self.count+=1
inputs['input'][0].to_netcdf( outputLocation )
self.write('location', outputLocation,location=outputLocation)
class Analysis(GenericPE):
def __init__(self):
GenericPE.__init__(self)
self._add_input('input')
self._add_output('output')
def _process(self,inputs):
self.log('Analysis_process')
nc = inputs['input'][0]
#Apply some np mathematics on nc
#Write to the output channel
self.write('output', (nc,inputs['input'][1]),metadata={'myterm': 10,'prov:type':'clipc:Pre-processed','index':inputs['input'][1]},message="")
class Combine(GenericPE):
def __init__(self):
GenericPE.__init__(self)
self._add_input('combine1',grouping=[1])
self._add_input('combine2',grouping=[1])
self._add_output('combo')
self.count=0
self.data1=[]
self.data2=[]
self.nc1 = None
self.nc2 = None
self.out = None
def _process(self,inputs):
self.log('Combine_process')
#self.log(inputs.keys())
if 'combine1' in inputs.keys():
self.data1.append(inputs['combine1'][0])
if 'combine2' in inputs.keys():
self.data2.append(inputs['combine2'][0])
if (len(self.data1)>0 and len(self.data2)>0):
nc1 = self.data1.pop(0)
nc2 = self.data2.pop(0)
nc=nc1
# numpy arithmetic for DataArrays...
self.log(nc2[self.parameters['var']][0][0][0])
for k,v in nc2.attrs.items():
if k in nc.attrs.keys():
nc.attrs[k] = v
else:
nc.attrs[k] = v
self.write('combo', (nc,self.count))
self.count+=1
In [3]:
#Initialise the graph
def createWorkflowGraph():
readX = Read()
readX.name = 'COLLECTOR1'
readY = Read()
readY.name = 'COLLECTOR2'
analyse = Analysis()
analyse.name = 'ANALYSIS'
analyse.parameters = { 'var':'SWE','filter': 10 }
analyse2 = Analysis()
analyse2.name = 'ANALYSIS'
analyse2.parameters = { 'var':'SWE','filter': 1 }
wf3 = Combine()
wf3.name = 'COMBINE'
wf3.parameters = { 'var':'SWE' }
writeX = Write()
writeX.name = 'STORE'
graph = WorkflowGraph()
graph.connect(readX ,'xarray' , analyse ,'input')
graph.connect(readY ,'xarray' , analyse2 ,'input')
graph.connect( analyse ,'output' , wf3 ,'combine1')
graph.connect( analyse2 ,'output' , wf3 ,'combine2')
graph.connect(wf3 ,'combo' , writeX , 'input')
return graph
graph = createWorkflowGraph()
from dispel4py.visualisation import display
display(graph)
A simple json representation is used to define initial input data for each named Component of the workflow. Every component can recieve a list of inputs. These will be streamed serially or in parallel, depending from the execution mode.
Actual data collected from a thredds repository. Slow internet connection may be result in longer executions.
In [4]:
name="new"
input_data = {
'COLLECTOR1': [ { 'input' : [ 'http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']},
{ 'input' : [ 'http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']}],
'COLLECTOR2': [ { 'input' : [ 'http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']},
{ 'input' : [ 'http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']}]
}
In [5]:
input_data = {
'COLLECTOR1': [ { 'input': [ 'data/local_1.nc']},{ 'input': [ 'data/local_1.nc']}],
'COLLECTOR2': [ { 'input': [ 'data/local_0.nc']},{ 'input': [ 'data/local_0.nc']}]
}
In [5]:
def runExampleWorkflow(graph):
print(input_data)
#Launch in simple process
result = simple_process.process_and_return(graph, input_data)
print("\n RESULT: "+str(result))
#runExampleWorkflow(graph)
Once the Provenance types have been defined, these are used to configure, or configure a workflow execution to comply with the desired provenance collection requirements. Below we illustrate the framework method and the details of this approach.
Type for contextual metadata in netcdf:
netcdfProvType: applicable to components that deal with a data formats containing array-oriented scientific data, including multiple variables and attributes associated with standard vocabularies and uid schemas.
Type capturing provenance patterns:
Nby1Flow: manges lineage of a component whose output depends on the data received on all its input ports in lock-step; e.g. combined analysis of multiple variables
In [6]:
class netcdfProvType(ProvenanceType):
def __init__(self):
ProvenanceType.__init__(self)
self.addNamespacePrefix("clipc","http://clipc.eu/ns/#")
self.addNamespacePrefix("var","http://openprovenance.org#")
def extractDataSourceId(self,data, input_port):
#Extract here the id from the data (type specific):
self.log('ANDREJ.extractExternalInputDataId')
try:
ds = xarray.open_dataset(data[0])
id = ds.attrs['id']
except Exception:
id = str(uuid.uuid1())
return id
def makeUniqueId(self, data, output_port):
#self.log('ANDREJ.makeUniqueId')
#produce the id
id=str(uuid.uuid1())
''' if nc data '''
if data!=None:
xa = data[0]
''' unique as defined by the community standard '''
xa.attrs['id'] = id
#Return
return id
''' extracts xarray metadata '''
def extractItemMetadata(self, data, output_port):
try:
nc_meta = OrderedDict()
''' cycle throug all attributes, dimensions and variables '''
xa = data[0]
# dataset meta
nc_meta['Dimensions'] = str( dict(xa.dims))
nc_meta['Type'] = str(type(xa))
for n , i in xa.data_vars.items():
for k , v in i.attrs.items():
nc_meta['clipc:'+n+"_"+str(k).replace(".","_")] = str(v)[0:25]
metadata = [nc_meta]
return metadata
except Exception:
self.log("Applying default metadata extraction"+str(traceback.format_exc()))
self.error=self.error+"Applying default metadata extraction:"+str(traceback.format_exc())
return super(netcdfProvType, self).extractItemMetadata(data,output_port);
class Nby1Flow(ProvenanceType):
def __init__(self):
ProvenanceType.__init__(self)
self.ports_lookups={}
def apply_derivation_rule(self,event,voidInvocation,oport=None,data=None,iport=None,metadata=None):
# reacts on specific events (write, end_invocation_event).
#The latter is be characterised by having returned data (voidInvocation)
if (event=='write'):
dep=[]
for x in self.inputconnections:
if x!=iport and x!='_d4py_feedback':
vv=self.ports_lookups[x].pop(0)
dep.append(vv)
#self.log("LOOKUP: "+str(vv))
self.setStateDerivations(dep)
if (event=='end_invocation_event' and voidInvocation==True):
if data!=None:
#self.ports_lookups['iport'].append(vv)
vv=str(abs(make_hash(tuple(iport+str(self.iterationIndex)))))
if not (iport in self.ports_lookups):
self.ports_lookups[iport]=[]
self.ports_lookups[iport].append(vv)
#self.log(self.ports_lookups)
#self.ignorePastFlow()
self.update_prov_state(vv,None,metadata={"LOOKUP":str(vv)})
self.discardInFlow()
if (event=='end_invocation_event' and voidInvocation==False):
self.discardInFlow()
self.discardState()
The 'configuration' describing the provenance setup.
In [7]:
selrule = {"ANALYSIS": {
"rules":{
"myterm": {
"$lt": 15 }
}
}
}
prov_config = {
'provone:User': "aspinuso",
's-prov:description' : "provdemo demokritos",
's-prov:workflowName': "demo_ecmwf",
's-prov:workflowType': "clipc:combine",
's-prov:workflowId' : "workflow process",
's-prov:save-mode' : 'service' ,
# defines the Provenance Types and Provenance Clusters for the Workflow Components
's-prov:componentsType' :
{'ANALYSIS': {'s-prov:type':(netcdfProvType,),
's-prov:prov-cluster':'clipc:Combiner'},
'COMBINE': {'s-prov:type':(netcdfProvType, Nby1Flow,),
's-prov:prov-cluster':'clipc:Combiner'},
'COLLECTOR1':{'s-prov:prov-cluster':'clipc:DataHandler,',
's-prov:type':(netcdfProvType,)},
'COLLECTOR2':{'s-prov:prov-cluster':'clipc:DataHandler,',
's-prov:type':(netcdfProvType,)},
'STORE': {'s-prov:prov-cluster':'clipc:DataHandler'}
},
's-prov:sel-rules': selrule
}
The REPOS_URL is the target provenence repo.
In [8]:
#Store via service
#ProvenanceType.REPOS_URL='http://127.0.0.1:8082/workflowexecutions/insert'
#ProvenanceType.PROV_EXPORT_URL='http://127.0.0.1:8082/data/'
#ProvenanceType.REPOS_URL='http://snf-3480.ok-kno.grnetcloud.net/prov/workflowexecutions/insert'
#ProvenanceType.PROV_EXPORT_URL='http://snf-3480.ok-kno.grnetcloud.net/prov/data/'
#http://snf-3480.ok-kno.grnetcloud.net:8082/swagger/
ProvenanceType.REPOS_URL='http://ec2-18-197-219-251.eu-central-1.compute.amazonaws.com/workflowexecutions/insert'
ProvenanceType.PROV_EXPORT_URL='http://ec2-18-197-219-251.eu-central-1.compute.amazonaws.com/data/'
#Store to local path
ProvenanceType.PROV_PATH='./prov-files/'
#Size of the provenance bulk before sent to storage or sensor
ProvenanceType.BULK_SIZE=1
#ProvenancePE.REPOS_URL='http://climate4impact.eu/prov/workflow/insert'
In [9]:
def createGraphWithProv():
graph=createWorkflowGraph()
#Location of the remote repository for runtime updates of the lineage traces. Shared among ProvenanceRecorder subtypes
# Ranomdly generated unique identifier for the current run
rid='JUP_COMBINE_'+getUniqueId()
# Finally, provenance enhanced graph is prepared:
#Initialise provenance storage to service:
configure_prov_run(graph,
provImpClass=(ProvenanceType,),
username=prov_config['provone:User'],
runId=rid,
description=prov_config['s-prov:description'],
workflowName=prov_config['s-prov:workflowName'],
workflowType=prov_config['s-prov:workflowType'],
workflowId=prov_config['s-prov:workflowId'],
save_mode=prov_config['s-prov:save-mode'],
componentsType=prov_config['s-prov:componentsType'],
sel_rules=prov_config['s-prov:sel-rules']
)
return graph,rid
graph,rid=createGraphWithProv()
display(graph)
In [10]:
runExampleWorkflow(graph)
update_prov_run(rid,save_mode=prov_config['s-prov:save-mode'],dic={'status':'terminated','endTime':str(datetime.datetime.utcnow())})
In [11]:
#''' read id of output to locate prov '''
import xml.etree.ElementTree as ET
finalFile = 'data/new_'+str(len(input_data['COLLECTOR1'])-1)+'.nc'
from shutil import copyfile
#copyfile(finalFile, 'data/newA.nc')
ds = xarray.open_dataset(finalFile)
dataid = ds.attrs['id'] #"orfeus-as-73355-c381c282-d422-11e6-ac42-f45c89acf865"
print("Extract Trace for dataid: "+dataid)
expurl = urlparse(ProvenanceType.PROV_EXPORT_URL)
connection = httplib.client.HTTPConnection(expurl.netloc)
url="http://"+expurl.netloc+expurl.path+dataid+"/export?level=100&format=xml"
print(url)
connection.request(
"GET", url)
response = connection.getresponse()
print("progress: " + str((response.status, response.reason)))
prov1 = ET.fromstring(response.read())
print('PROV TO EMBED:')
print(str(prov1))
Embed the PROV document in the NetCDF file within the 'prov_xml'of the 'provenance' property and print the file's properties
In [12]:
#ds. create variable save to file
ds.load()
ds['provenance'] = xarray.DataArray("")
ds['provenance'].attrs['prov_xml']=str(prov1)
ds.to_netcdf(str(finalFile+"_PROV"))
ds = xarray.open_dataset(str(finalFile+"_PROV"))
print(ds)
Visualise the PROV document in its standard graphical view
In [21]:
import prov
from IPython.display import Image
import io
from prov.model import ProvDocument, ProvBundle, ProvException, first, Literal
from prov.dot import prov_to_dot
def provURLToPNG(xml,format):
xml_doc = io.BytesIO(urllib.request.urlopen(xml).read())
doc=ProvDocument.deserialize(xml_doc,format="xml")
dot = prov_to_dot(doc)
if format=="png":
dot.write_png('PROV.png')
return 'PROV.png'
if format=="svg":
dot.write_svg('PROV.svg')
return 'PROV.svg'
png_content=provURLToPNG(url,"png")
Image(png_content)
# visualse NetCDF provenance in PNG
Out[21]:
the end.
In [ ]: