Provenance integration in netcdf/xarray Data-Intensive workflows

Authors: Alessandro Spinuso and Andrej Mihajlovski

Royal Netherlands Meteorological Institure (KNMI)

The following "Live" notebook demonstrates a simple workflow implemented with a data-intensive processing library (dispel4py), that has been extended with a configurable and programmable provenance tracking framework (Active provenance).

Highligts - Active Provenance, S-PROV and S-ProvFlow

  • The provenance information produced can be tuned and adapted to computational, precision and contextualisation requirements
  • The Active freamework allows for the traceability of data-reuse across different executions, methods and users
  • The provenance can be stored as files or sent at run-time to an external repository (S-ProvFlow)
  • The repository can be searched and explored via interactive tools
  • The provenance model is designed around an hybrid data-flow model, which takes into account data-streams and concrente data resourcese. eg. file location, webservices etc.
  • The lineage can be exported from the repository in W3C PROV format. This facilitates the production of interoperabile reports and data-curation tasks. For instance, The provenance related to specific data can be stored in W3C-PROV XML format into strucutred file formats (NetCDF) as well as istitutional and general-purpose citable data-repositories.

Demonstration outline

1 - Workflow specification and execution

  1. Define the Classes of the Workflow Components
  2. Construct the Workflow application
  3. Prepare the Input
  4. Visualise and run the workflow without provenance

2 - Provenance Types, Configuration and Contextualisation

  1. Define the Provenance Types to be used within the workflow
  2. Configure the workfow for provenance tracking
  3. Visualise and run workfow with provenance activatied
  4. Export and embed provenance within NetCDF results
  5. Explore the resulting provenance with interactive and static visualsations

3 - Data-reuse traceability.

  1. Change the input and demostrate consistency of provenance for data-ruse across multiple workflow executions
  2. Discuss more complex use cases and configuration options

4 - Informal Evaluation

SWOT form:

https://docs.google.com/presentation/d/10xlRYytR7NB9iC19T29BD-rW77ZAtnjtlukMJDP_MIs/edit?usp=sharing

1 - Workflow specification and execution

  • The dispel4py framework is utilised for the workflows
  • Xarray for in-memory management of netcdf data.
  • Matplotlib for visualisation.
  • W3C for provenance representation.

In [1]:
import xarray
import netCDF4
import json

from dispel4py.workflow_graph import WorkflowGraph 
from dispel4py.provenance import *

from collections import OrderedDict
import time
import random

from dispel4py.base import create_iterative_chain, ConsumerPE, IterativePE, SimpleFunctionPE

import matplotlib.pyplot as plt
import traceback
import numpy as np
from pprint import pprint

Simple Workflow, xarray in xarray out. The generic processing elements are defined below. the GenericPE bellongs to the dispel4py framework. It allows data-objects to be passed as inputs and outputs. The Components are linked and visualised via the workflow_graph module.

1.1 The Four Workflow Components:

  1. - Read, xarray is read into memory.
  2. - ANALYSIS, xarray is processed/passed to output (dummy, no real changes in the example)
  3. - Write, xarray is visualised.
  4. - Combine, two xarray are combined into one ds.

In [2]:
class Read(GenericPE):
    
    def __init__(self):
        GenericPE.__init__(self)
        self._add_input('input')
        self._add_output('xarray')
        self.count=0

    def _process(self,inputs):
        self.log('Read_Process')
     
        self.log(inputs)
        
        inputLocation = inputs['input'][0]

        ds = xarray.open_dataset( inputLocation )
        
        self.write( 'xarray' , (ds, self.count) , location=inputLocation )
        self.count+=1
            
class Write(GenericPE):
    
    def __init__(self):
        GenericPE.__init__(self)
        self._add_input('input')
        self._add_output('location')
        self.count=0
         
         
        
    def _process(self,inputs):
        self.log('Write_Function')
         
        outputLocation = "data/new_"+str(self.count)+".nc"
        self.count+=1
        inputs['input'][0].to_netcdf( outputLocation )
         
        self.write('location', outputLocation,location=outputLocation)
        
        
        
class Analysis(GenericPE):
        
    def __init__(self):
        GenericPE.__init__(self)
        self._add_input('input')
        self._add_output('output')
         
        
    def _process(self,inputs):
        self.log('Analysis_process')
         
        nc = inputs['input'][0]
        
        #Apply some np mathematics on nc
        
        
        #Write to the output channel
        self.write('output', (nc,inputs['input'][1]),metadata={'myterm': 10,'prov:type':'clipc:Pre-processed','index':inputs['input'][1]},message="")
        
        


class Combine(GenericPE):
     
    def __init__(self):
        GenericPE.__init__(self)
        self._add_input('combine1',grouping=[1])
        self._add_input('combine2',grouping=[1])
        self._add_output('combo')
        self.count=0
        self.data1=[]
        self.data2=[]
        self.nc1 = None
        self.nc2 = None
        self.out = None
        
    def _process(self,inputs):
        self.log('Combine_process')
        #self.log(inputs.keys())
        
        if 'combine1' in inputs.keys():
            self.data1.append(inputs['combine1'][0])
            
        
        if 'combine2' in inputs.keys():
            self.data2.append(inputs['combine2'][0])
            
        
         
        if (len(self.data1)>0 and len(self.data2)>0):
            
            nc1 =  self.data1.pop(0)
            nc2 =  self.data2.pop(0)
            
            nc=nc1
            # numpy arithmetic for DataArrays...
             
            self.log(nc2[self.parameters['var']][0][0][0])
            
            
            
            for k,v in nc2.attrs.items():
                if k in nc.attrs.keys():
                    nc.attrs[k] = v
                else:
                    nc.attrs[k] = v
            
           
            
            self.write('combo', (nc,self.count))
            self.count+=1

1.2 Construct the Workflow application

Instantiates the Components and combines them in a workflow graph which gets eventually visualised.

Siz Instances are created from the above PEs.


In [3]:
#Initialise the graph
def createWorkflowGraph():
    readX  = Read()
    readX.name = 'COLLECTOR1'
    readY  = Read()
    readY.name = 'COLLECTOR2'
    
    analyse   = Analysis()
    analyse.name    = 'ANALYSIS'
    analyse.parameters = { 'var':'SWE','filter': 10 }

    analyse2   = Analysis()
    analyse2.name    = 'ANALYSIS'
    analyse2.parameters = { 'var':'SWE','filter': 1 }
    
    wf3     = Combine()
    wf3.name    = 'COMBINE'
    wf3.parameters = { 'var':'SWE' }
    
    writeX = Write()
    writeX.name = 'STORE'
    
    
    graph = WorkflowGraph()    
    
    graph.connect(readX ,'xarray'   , analyse      ,'input')
    graph.connect(readY ,'xarray'   , analyse2     ,'input')
    
    graph.connect( analyse  ,'output'   , wf3     ,'combine1')
    graph.connect( analyse2 ,'output'   , wf3     ,'combine2')
    
    graph.connect(wf3    ,'combo'   , writeX , 'input')

    return graph



graph = createWorkflowGraph()


from dispel4py.visualisation import display
display(graph)


1.3 Specify the Input

A simple json representation is used to define initial input data for each named Component of the workflow. Every component can recieve a list of inputs. These will be streamed serially or in parallel, depending from the execution mode.

Data Collected from external URLs

Actual data collected from a thredds repository. Slow internet connection may be result in longer executions.


In [4]:
name="new"

input_data = {     
                'COLLECTOR1': [ { 'input' : [ 'http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']},
                               { 'input' : [ 'http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']}],
                'COLLECTOR2': [ { 'input' : [ 'http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']},
                               { 'input' : [ 'http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']}]
                 
             }

Alternatively!!

Data Collected from local archive (for data re-use demonstration)


In [5]:
input_data = {     
                'COLLECTOR1': [ { 'input': [ 'data/local_1.nc']},{ 'input': [ 'data/local_1.nc']}],
                'COLLECTOR2': [ { 'input': [ 'data/local_0.nc']},{ 'input': [ 'data/local_0.nc']}]
             }

Function to run the workflow

Applies the single process mode for demonstration purposes


In [5]:
def runExampleWorkflow(graph):
                                                     
    print(input_data)                   

    #Launch in simple process
    result = simple_process.process_and_return(graph, input_data)
    print("\n RESULT: "+str(result))

#runExampleWorkflow(graph)

2 - Provenance Types, Configuration and Contextualisation

2.1 Define a Provenance Type

Once the Provenance types have been defined, these are used to configure, or configure a workflow execution to comply with the desired provenance collection requirements. Below we illustrate the framework method and the details of this approach.

  • configure_prov_run With this method, the users of the workflow can configure their run for provenance by indicating which types to apply to each component. Users can also chose where to store the metadata, locally to the file system or to a remote service. These operations can be performed in bulks, with different impacts on the overall overhead and on the experienced rapidity of the access of the lineage information. Finally, also general information about the attribution of the run, such as username, run_id, description, workflow_name, workflow_id are captured and included within the provenance traces.
  • apply_derivation_rule (Advanced) This method is invoked by each iteration when a decision has to be made on the required lineage pattern. The framework automatically passes information whether the invocation has produced any output or not. The method, according to predefined rules, provides indications on either discarding the current input data or to include it into the StateCollection automatically, capturing its contribution to the next invocation. In our implementation, basic provenance types such are available and can be used accordingly the specific needs.
  • Selectiviy-Rules (Advanced) Users can tune the scale of the records produced by indicating in the above method a set of Selectiviy-Rules for every component. This functionality allows users to specify rules to control the data-driven production of the provenance declaratively. The approach takes advantage of the contextualisation applied by the provenance types, which extract domain and experimental metadata, and evaluates their value against simple Selectiviy-Rules

Type for contextual metadata in netcdf:

netcdfProvType: applicable to components that deal with a data formats containing array-oriented scientific data, including multiple variables and attributes associated with standard vocabularies and uid schemas.

Type capturing provenance patterns:

Nby1Flow: manges lineage of a component whose output depends on the data received on all its input ports in lock-step; e.g. combined analysis of multiple variables


In [6]:
class netcdfProvType(ProvenanceType):
    def __init__(self):
        ProvenanceType.__init__(self)
        self.addNamespacePrefix("clipc","http://clipc.eu/ns/#")
        self.addNamespacePrefix("var","http://openprovenance.org#")
    
    def extractDataSourceId(self,data, input_port):
        #Extract here the id from the data (type specific):

        self.log('ANDREJ.extractExternalInputDataId')
         
        
        try:
             
            ds = xarray.open_dataset(data[0])
            id = ds.attrs['id']
             
             
            
            
        except Exception:
            id = str(uuid.uuid1())
             
       
        return id
     
    
    def makeUniqueId(self, data, output_port):      
        
        #self.log('ANDREJ.makeUniqueId')
        
        #produce the id
        id=str(uuid.uuid1())
            
        ''' if nc data '''
        if data!=None:
            xa = data[0]
        
            ''' unique as defined by the community standard '''
            xa.attrs['id'] = id
        
        #Return
        return id 
    

    
    ''' extracts xarray metadata '''
    def extractItemMetadata(self, data, output_port):
         
        
        try:            
            nc_meta = OrderedDict()
            
            ''' cycle throug all attributes, dimensions and variables '''
            xa = data[0]
            # dataset meta
            nc_meta['Dimensions'] = str( dict(xa.dims)) 
            nc_meta['Type'] = str(type(xa))
            
             
            for n , i in xa.data_vars.items():
                for k , v in i.attrs.items():
                    nc_meta['clipc:'+n+"_"+str(k).replace(".","_")] = str(v)[0:25]
            
           
            metadata = [nc_meta]
            
            return metadata
                             
        except Exception:
            self.log("Applying default metadata extraction"+str(traceback.format_exc()))
            self.error=self.error+"Applying default metadata extraction:"+str(traceback.format_exc())
            return super(netcdfProvType, self).extractItemMetadata(data,output_port);
        
        
        
class Nby1Flow(ProvenanceType):
    def __init__(self):
        ProvenanceType.__init__(self)
        self.ports_lookups={}
        

    def apply_derivation_rule(self,event,voidInvocation,oport=None,data=None,iport=None,metadata=None):
    
         
# reacts on specific events (write, end_invocation_event). 
#The latter is be characterised by having returned data (voidInvocation)
        if (event=='write'):
            dep=[]
            for x in self.inputconnections:
                if x!=iport and x!='_d4py_feedback':
                    vv=self.ports_lookups[x].pop(0)
                    dep.append(vv)
                    #self.log("LOOKUP: "+str(vv))
                self.setStateDerivations(dep)
                

        if (event=='end_invocation_event' and voidInvocation==True):
                
            if data!=None:
                #self.ports_lookups['iport'].append(vv)
                vv=str(abs(make_hash(tuple(iport+str(self.iterationIndex)))))
                if not (iport in self.ports_lookups):
                    self.ports_lookups[iport]=[]

                self.ports_lookups[iport].append(vv)
                #self.log(self.ports_lookups)
                #self.ignorePastFlow()
                self.update_prov_state(vv,None,metadata={"LOOKUP":str(vv)})
                self.discardInFlow()


        if (event=='end_invocation_event' and voidInvocation==False):
                 self.discardInFlow()
                 self.discardState()

The 'configuration' describing the provenance setup.


In [7]:
selrule = {"ANALYSIS": { 
                         "rules":{ 
                                 "myterm": {
                                            "$lt": 15 }
                                }
                        }
           }

prov_config =  {
                    'provone:User': "aspinuso", 
                    's-prov:description' : "provdemo demokritos",
                    's-prov:workflowName': "demo_ecmwf",
                    's-prov:workflowType': "clipc:combine",
                    's-prov:workflowId'  : "workflow process",
                    's-prov:save-mode'   : 'service'         ,
                    # defines the Provenance Types and Provenance Clusters for the Workflow Components
                    's-prov:componentsType' : 
                                       {'ANALYSIS': {'s-prov:type':(netcdfProvType,),
                                                     's-prov:prov-cluster':'clipc:Combiner'},
                                        'COMBINE':  {'s-prov:type':(netcdfProvType, Nby1Flow,),
                                                     's-prov:prov-cluster':'clipc:Combiner'},
                                        'COLLECTOR1':{'s-prov:prov-cluster':'clipc:DataHandler,',
                                                     's-prov:type':(netcdfProvType,)},
                                        'COLLECTOR2':{'s-prov:prov-cluster':'clipc:DataHandler,',
                                                     's-prov:type':(netcdfProvType,)},
                                        'STORE':    {'s-prov:prov-cluster':'clipc:DataHandler'}
                                        },
                    's-prov:sel-rules': selrule
                }

The REPOS_URL is the target provenence repo.


In [8]:
#Store via service
#ProvenanceType.REPOS_URL='http://127.0.0.1:8082/workflowexecutions/insert'
#ProvenanceType.PROV_EXPORT_URL='http://127.0.0.1:8082/data/'

#ProvenanceType.REPOS_URL='http://snf-3480.ok-kno.grnetcloud.net/prov/workflowexecutions/insert'
#ProvenanceType.PROV_EXPORT_URL='http://snf-3480.ok-kno.grnetcloud.net/prov/data/'

#http://snf-3480.ok-kno.grnetcloud.net:8082/swagger/
ProvenanceType.REPOS_URL='http://ec2-18-197-219-251.eu-central-1.compute.amazonaws.com/workflowexecutions/insert'
ProvenanceType.PROV_EXPORT_URL='http://ec2-18-197-219-251.eu-central-1.compute.amazonaws.com/data/'

#Store to local path
ProvenanceType.PROV_PATH='./prov-files/'

#Size of the provenance bulk before sent to storage or sensor
ProvenanceType.BULK_SIZE=1

#ProvenancePE.REPOS_URL='http://climate4impact.eu/prov/workflow/insert'

In [9]:
def createGraphWithProv():
    
    graph=createWorkflowGraph()
    #Location of the remote repository for runtime updates of the lineage traces. Shared among ProvenanceRecorder subtypes

    # Ranomdly generated unique identifier for the current run
    rid='JUP_COMBINE_'+getUniqueId()

    
    # Finally, provenance enhanced graph is prepared:
    

     
    #Initialise provenance storage to service:
    configure_prov_run(graph, 
                     provImpClass=(ProvenanceType,),
                     username=prov_config['provone:User'],
                     runId=rid,
                     description=prov_config['s-prov:description'],
                     workflowName=prov_config['s-prov:workflowName'],
                     workflowType=prov_config['s-prov:workflowType'],
                     workflowId=prov_config['s-prov:workflowId'],
                     save_mode=prov_config['s-prov:save-mode'],
                     componentsType=prov_config['s-prov:componentsType'],
                     sel_rules=prov_config['s-prov:sel-rules']
                      
                    )
    
    return graph,rid


graph,rid=createGraphWithProv()

display(graph)


Change grouping implementation 
ANALYSIS Original base class: (<class 'dispel4py.core.GenericPE'>,)
ANALYSIS {'rules': {'myterm': {'$lt': 15}}}
 New type: (<class '__main__.netcdfProvType'>, <class '__main__.Analysis'>)
ANALYSIS Original base class: (<class 'dispel4py.core.GenericPE'>,)
ANALYSIS {'rules': {'myterm': {'$lt': 15}}}
 New type: (<class '__main__.netcdfProvType'>, <class '__main__.Analysis'>)
COLLECTOR1 Original base class: (<class 'dispel4py.core.GenericPE'>,)
 New type: (<class '__main__.netcdfProvType'>, <class '__main__.Read'>)
COLLECTOR2 Original base class: (<class 'dispel4py.core.GenericPE'>,)
 New type: (<class '__main__.netcdfProvType'>, <class '__main__.Read'>)
COMBINE Original base class: (<class 'dispel4py.core.GenericPE'>,)
 New type: (<class '__main__.netcdfProvType'>, <class '__main__.Nby1Flow'>, <class '__main__.Combine'>)
STORE Original base class: (<class 'dispel4py.core.GenericPE'>,)
 New type: (<class 'dispel4py.provenance.ProvenanceType'>, <class '__main__.Write'>)
Inputs: {'NewWorkflowRun': [{'input': 'None'}]}
NewWorkflowRun12: BUILDING INITIAL DERIVATION
NewWorkflowRun12: STORING WORKFLOW RUN METADATA
NewWorkflowRun12: Postprocess: (200, 'OK', b'{"inserts": [{"updatedExisting": false, "nModified": 0, "ok": 1.0, "upserted": "JUP_COMBINE_orfeus-as-81375-d31c071a-2610-11e9-94d1-f45c89acf865", "n": 1}], "success": true}')
SimplePE: Processed 1 iteration.
Outputs: {}

Execution with provenance

The following script executes the workflow in single-process mode


In [10]:
runExampleWorkflow(graph)

update_prov_run(rid,save_mode=prov_config['s-prov:save-mode'],dic={'status':'terminated','endTime':str(datetime.datetime.utcnow())})


{'COLLECTOR1': [{'input': ['http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']}, {'input': ['http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']}], 'COLLECTOR2': [{'input': ['http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']}, {'input': ['http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']}]}
COLLECTOR16: ANDREJ.extractExternalInputDataId
COLLECTOR16: BUILDING INITIAL DERIVATION
COLLECTOR16: Read_Process
COLLECTOR16: {'input': ['http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']}
COLLECTOR16: ANDREJ.extractExternalInputDataId
COLLECTOR16: BUILDING INITIAL DERIVATION
COLLECTOR16: Read_Process
COLLECTOR16: {'input': ['http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']}
ANALYSIS7: Analysis_process
ANALYSIS7: Checking Selectivity-Rules: {'rules': {'myterm': {'$lt': 15}}}
ANALYSIS7: 10
ANALYSIS7: Analysis_process
ANALYSIS7: Checking Selectivity-Rules: {'rules': {'myterm': {'$lt': 15}}}
ANALYSIS7: 10
COLLECTOR28: ANDREJ.extractExternalInputDataId
COLLECTOR28: BUILDING INITIAL DERIVATION
COLLECTOR28: Read_Process
COLLECTOR28: {'input': ['http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']}
COLLECTOR28: ANDREJ.extractExternalInputDataId
COLLECTOR28: BUILDING INITIAL DERIVATION
COLLECTOR28: Read_Process
COLLECTOR28: {'input': ['http://opendap.knmi.nl/knmi/thredds/dodsC/CLIPC/cmcc/SWE/SWE_ophidia-0-10-1_CMCC_GlobSnow-SWE-L3B_monClim_19791001-20080701_1979-2008.nc']}
ANALYSIS9: Analysis_process
ANALYSIS9: Checking Selectivity-Rules: {'rules': {'myterm': {'$lt': 15}}}
ANALYSIS9: 10
ANALYSIS9: Analysis_process
ANALYSIS9: Checking Selectivity-Rules: {'rules': {'myterm': {'$lt': 15}}}
ANALYSIS9: 10
COMBINE10: Combine_process
COMBINE10: Combine_process
COMBINE10: Combine_process
COMBINE10: <xarray.DataArray 'SWE' ()>
array(nan)
Coordinates:
    y        float64 -9.024e+06
    x        float64 -9.024e+06
    time     datetime64[ns] 1994-10-16T12:00:00
Attributes:
    grid_mapping:   lambert_azimuthal_equal_area
    units:          mm
    standard_name:  lwe_thickness_of_surface_snow_amount
    long_name:      Snow Water Equivalent
    _ChunkSize:     [  1 721 721]
COMBINE10: Combine_process
COMBINE10: <xarray.DataArray 'SWE' ()>
array(nan)
Coordinates:
    y        float64 -9.024e+06
    x        float64 -9.024e+06
    time     datetime64[ns] 1994-10-16T12:00:00
Attributes:
    grid_mapping:   lambert_azimuthal_equal_area
    units:          mm
    standard_name:  lwe_thickness_of_surface_snow_amount
    long_name:      Snow Water Equivalent
    _ChunkSize:     [  1 721 721]
STORE11: Write_Function
STORE11: Write_Function
SimplePE: Processed 1 iteration.

 RESULT: {'STORE11': {'location': [{'_d4p': 'data/new_0.nc', 'prov_cluster': 'clipc:DataHandler', 'port': 'location', 'id': 'orfeus-as-81375-ebf1e926-2610-11e9-94d1-f45c89acf865', 'TriggeredByProcessIterationID': 'STORE-orfeus-as-81375-e009d4fc-2610-11e9-94d1-f45c89acf865'}, {'_d4p': 'data/new_1.nc', 'prov_cluster': 'clipc:DataHandler', 'port': 'location', 'id': 'orfeus-as-81375-ec0892d4-2610-11e9-94d1-f45c89acf865', 'TriggeredByProcessIterationID': 'STORE-orfeus-as-81375-ebf1f1be-2610-11e9-94d1-f45c89acf865'}]}}
Inputs: {'UpdateWorkflowRun': [{'input': 'None'}]}
UpdateWorkflowRun14: BUILDING INITIAL DERIVATION
UpdateWorkflowRun14: {'status': 'terminated', 'endTime': '2019-02-01 11:02:53.606705', 'runId': 'JUP_COMBINE_orfeus-as-81375-d31c071a-2610-11e9-94d1-f45c89acf865'}
UpdateWorkflowRun14: {'status': 'terminated', 'endTime': '2019-02-01 11:02:53.606705', 'runId': 'JUP_COMBINE_orfeus-as-81375-d31c071a-2610-11e9-94d1-f45c89acf865', 'type': 'workflow_run'}
UpdateWorkflowRun14: UPDATING WORKFLOW RUN METADATA{'status': 'terminated', 'endTime': '2019-02-01 11:02:53.606705', 'runId': 'JUP_COMBINE_orfeus-as-81375-d31c071a-2610-11e9-94d1-f45c89acf865', 'type': 'workflow_run'}
UpdateWorkflowRun14: Postprocess: (200, 'OK', b'{"inserts": [{"updatedExisting": true, "nModified": 1, "ok": 1.0, "n": 1}], "success": true}')
SimplePE: Processed 1 iteration.
Outputs: {}

2.3 - Visualise Provenance Trace

Read the if of the output to locate the provenance trace on the remote service

2.3.1 Visualise in S-ProvFlow

The following link opens a local installation of the S-ProvFlow System GUI

2.3.1 Extract from S-ProvFlow API

The following scripts extracts the provenance of the last file produced in standard PROV-XML, and embed it into the file itself

Extraction from the API


In [11]:
#''' read id of output to locate prov '''
import xml.etree.ElementTree as ET

finalFile = 'data/new_'+str(len(input_data['COLLECTOR1'])-1)+'.nc'
from shutil import copyfile

#copyfile(finalFile, 'data/newA.nc')
ds = xarray.open_dataset(finalFile)
dataid = ds.attrs['id']     #"orfeus-as-73355-c381c282-d422-11e6-ac42-f45c89acf865"
 
print("Extract Trace for dataid: "+dataid)
expurl = urlparse(ProvenanceType.PROV_EXPORT_URL)
connection = httplib.client.HTTPConnection(expurl.netloc)
url="http://"+expurl.netloc+expurl.path+dataid+"/export?level=100&format=xml"
print(url)
connection.request(
                "GET", url)
response = connection.getresponse()
  
print("progress: " + str((response.status, response.reason)))
prov1 = ET.fromstring(response.read())
print('PROV TO EMBED:')
print(str(prov1))


Extract Trace for dataid: df625c9a-2610-11e9-94d1-f45c89acf865
http://ec2-18-197-219-251.eu-central-1.compute.amazonaws.com/data/df625c9a-2610-11e9-94d1-f45c89acf865/export?level=100&format=xml
progress: (200, 'OK')
PROV TO EMBED:
<Element '{http://www.w3.org/ns/prov#}document' at 0xc18d78188>

Embed the PROV document in the NetCDF file within the 'prov_xml'of the 'provenance' property and print the file's properties


In [12]:
#ds. create variable save to file

ds.load()
ds['provenance'] = xarray.DataArray("")

ds['provenance'].attrs['prov_xml']=str(prov1)

ds.to_netcdf(str(finalFile+"_PROV"))
ds = xarray.open_dataset(str(finalFile+"_PROV"))
print(ds)


<xarray.Dataset>
Dimensions:                       (bnds: 2, time: 12, x: 721, y: 721)
Coordinates:
  * y                             (y) float64 -9.024e+06 ... 9.024e+06
  * x                             (x) float64 -9.024e+06 ... 9.024e+06
  * time                          (time) datetime64[ns] 1994-10-16T12:00:00 ... 1995-09-16
Dimensions without coordinates: bnds
Data variables:
    time_bnds                     (time, bnds) datetime64[ns] ...
    lambert_azimuthal_equal_area  |S64 ...
    SWE                           (time, y, x) float32 ...
    lat                           (y, x) float32 ...
    lon                           (y, x) float32 ...
    provenance                    object ...
Attributes:
    source:                          SMMR L3 brightness temperatures in EASE ...
    auxiliary_data:                  GLC-2000 derived land classification mas...
    product_version:                 2.0
    summary:                         Snow water equivalent values on 25 km by...
    id:                              df625c9a-2610-11e9-94d1-f45c89acf865
    naming_authority:                fi.fmi
    keywords_vocabulary:             NetCDF Climate and Forecast (CF) Metadat...
    cdm_data_type:                   grid
    project:                         ESA GlobSnow-2
    geospatial_vertical_min:         0.0
    geospatial_vertical_max:         0.0
    geospatial_lat_units:            degrees north
    geospatial_lon_units:            degrees east
    standard_name_vocabulary:        NetCDF Climate and Forecast (CF) Metadat...
    license:                         These data may be redistributed and used...
    sensor:                          SMMR
    spatial_resolution:              25 km
    algorithm:                       FMI assimilation algorithm (Pulliainen 2...
    tracking_id:                     47092dd1-7a86-47b1-952c-3ae3163d3828
    Conventions:                     CF-1.6
    package_references:              ophidia.cmcc.it
    date_published:                  2016-07-19
    date_revised:                    2016-07-19
    institution:                     CMCC
    institution_id:                  CMCC
    institution_url:                 www.cmcc.it
    contact_email:                   ophidia-info@lists.cmcc.it
    creator_name:                    CMCC
    creator_url:                     www.cmcc.it
    creator_email:                   ophidia-info@lists.cmcc.it
    contributor_name:                 
    contributor_role:                 
    platform:                        station
    platform_id:                     NIMBUS
    satellite_algorithm:              
    satellite_sensor:                SMMR
    indata_history:                   
    frequency:                       mon
    cdm_datatype:                    grid
    geospatial_bounds:               POLYGON (35 -180, 85 -180, 85 180, 35 180)
    geospatial_lat_min:              35.0
    geospatial_lat_max:              85.0
    geospatial_lon_min:              -180.0
    geospatial_lon_max:              180.0
    geospatial_lat_resolution:       25 km
    geospatial_lon_resolution:       25 km
    project_id:                      CLIP-C
    activity:                        clipc
    title:                           Snow Water Equivalent, Oct monthly aggre...
    time_coverage_start:             19791001
    product:                         obs_derived
    comment:                          
    references:                       
    package_name:                    ophidia-0-10-1
    date_created:                    20160701
    date_modified:                    
    date_issued:                     20160802
    source_data_id:                  GlobSnow-SWE-L3B
    source_data_id_comment:           
    invar_platform:                  remote sensing
    invar_platform_id:               ESA GlobSnow
    invar_satellite_algorithm:        
    invar_satellite_sensor:          SMMR
    invar_rcm_model_id:               
    invar_rcm_model_realization_id:   
    invar_rcm_model_driver:           
    invar_reanalysis_id:              
    invar_gcm_model_id:               
    invar_experiment_name:            
    invar_ensemble_member:            
    invar_bc_method_id:               
    invar_bc_observation_id:          
    invar_bc_period:                  
    invar_variable_name:             SWE_avg
    reference_period:                1979-2008
    output_frequency:                monClim
    tile:                             
    keywords:                        SWE-avg,climate,index
    invar_tracking_id:                
    contact:                         ophidia-info@lists.cmcc.it
    realisation_id:                   
    variable_name:                   SWE
    history:                          
    domain:                          180E-180W-35N-85N
    time_coverage_end:               20080701
    DODS.strlen:                     0
    DODS_EXTRA.Unlimited_Dimension:  time

Visualise the PROV document in its standard graphical view


In [21]:
import prov
from IPython.display import Image
import io
from prov.model import ProvDocument, ProvBundle, ProvException, first, Literal
from prov.dot import prov_to_dot

def provURLToPNG(xml,format):
     
    xml_doc = io.BytesIO(urllib.request.urlopen(xml).read())
    doc=ProvDocument.deserialize(xml_doc,format="xml")
    dot = prov_to_dot(doc)
    
    if format=="png":
        dot.write_png('PROV.png')
        return 'PROV.png' 
    if format=="svg":
        dot.write_svg('PROV.svg')
        return 'PROV.svg'
    
    


png_content=provURLToPNG(url,"png")

Image(png_content)

    
    
# visualse NetCDF provenance in PNG


Out[21]:

the end.


In [ ]: