In [54]:
#from prov.model import ProvDocument
#from prov.dot import prov_to_dot
#from IPython.display import Image
#from prov.model import (
# PROV_ACTIVITY, PROV_AGENT, PROV_ALTERNATE, PROV_ASSOCIATION,
# PROV_ATTRIBUTION, PROV_BUNDLE, PROV_COMMUNICATION, PROV_DERIVATION,
# PROV_DELEGATION, PROV_ENTITY, PROV_GENERATION, PROV_INFLUENCE,
# PROV_INVALIDATION, PROV_END, PROV_MEMBERSHIP, PROV_MENTION,
# PROV_SPECIALIZATION, PROV_START, PROV_USAGE, Identifier,
# PROV_ATTRIBUTE_QNAMES, sorted_attributes, ProvException
import prov.model as prov
import six
import itertools
ns_dict = {
'prov':'http://www.w3.org/ns/prov#',
'var':'http://openprovenance.org/var#>',
'vargen':'http://openprovenance.org/vargen#',
'tmpl':'http://openprovenance.org/tmpl#',
'foaf':'http://xmlns.com/foaf/0.1/',
'ex': 'http://example.org/',
'orcid':'http://orcid.org/',
#document.set_default_namespace('http://example.org/0/')
'rdf':'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
'rdfs':'http://www.w3.org/2000/01/rdf-schema#',
'xsd':'http://www.w3.org/2001/XMLSchema#',
'ex1': 'http://example.org/1/',
'ex2': 'http://example.org/2/'
}
instance_dict = {
'var:author':'orcid:0000-0002-3494-120X',
'var:value':'A Little Provenance Goes a Long Way',
'var:name':'Luc Moreau',
'var:quote':'ex:quote1'
}
doc0 = set_namespaces(ns_dict,prov.ProvDocument())
In [55]:
def set_namespaces(ns, prov_doc):
if isinstance(ns,dict):
for (sn,ln) in ns.items():
prov_doc.add_namespace(sn,ln)
else:
for nsi in ns:
prov_doc.add_namespace(nsi)
return prov_doc
def make_prov(prov_doc):
# for enes data ingest use case: use information from dkrz_forms/config/workflow_steps.py
bundle = prov_doc.bundle('vargen:bundleid')
#bundle.set_default_namespace('http://example.org/0/')
quote = bundle.entity('var:quote',(
('prov:value','var:value'),
))
author = bundle.entity('var:author',(
(prov.PROV_TYPE, "prov:Person"),
('foaf:name','var:name')
))
bundle.wasAttributedTo('var:quote','var:author')
return prov_doc
def save_and_show(filename):
doc1 = make_prov(doc0)
print(doc1.get_provn())
with open(filename, 'w') as provn_file:
provn_file.write(doc1.get_provn())
print("------")
print("saved in file:",filename)
return doc1
doc1 = save_and_show('/home/stephan/test/xxxx.provn')
In [42]:
tst = doc1.namespaces
docx = set_namespaces(tst,prov.ProvDocument())
In [113]:
# To Do: condense matching functionality into one function/class
def match_qn(qn,mdict):
lp = qn.localpart
ns = qn.namespace.prefix
source = ns+":"+lp
target = match(source,mdict)
return target
def match(eid,mdict):
if eid in mdict:
print("Match: ",eid)
return mdict[eid]
else:
print("No Match: ",eid)
return eid
def attr_match(attr_list,mdict):
p_dict = {}
for (pn,pv) in attr_list:
npn_new = match_qn(pn,mdict)
p_dict[npn_new] = match(pv,mdict)
print("Attr dict:",p_dict)
return p_dict
#---------------------------------------------------------------
def instantiate_template(prov_doc,instance_dict):
new_doc = set_namespaces(prov_doc.namespaces,prov.ProvDocument())
relations = []
nodes = []
records = prov_doc.get_records()
blist = list(prov_doc.bundles)
# no nested bundle support for now !
if blist == []:
print("Attention: no bundles to transform")
no_bundles = True
#blist = [prov_doc]
for bundle in blist:
new_bundle = new_doc.bundle(bundle.identifier)
for rec in bundle.records:
if rec.is_element():
nodes.append(rec)
#print(rec)
elif rec.is_relation():
relations.append(rec)
else:
print("Warning: Unrecognized element type: ",rec)
for rec in nodes:
eid = rec.identifier
attr = rec.attributes
args = rec.args
props = attr_match(attr,instance_dict)
neid = match(eid._str,instance_dict)
new_node = new_bundle.entity(Identifier(neid),other_attributes=props)
for rel in relations:
args = rel.args
(first,second) = args
(nfirst,nsecond) = (match_qn(first,instance_dict),match_qn(second,instance_dict))
if rel.get_type() == prov.PROV_ATTRIBUTION:
new_rel = new_bundle.wasAttributedTo(nfirst,nsecond)
elif rel.get_type() == prov.PROV_ASSOCIATION:
new_rel = new_bundle.wasAttributedTo(nfirst,nsecond)
elif rel.get_type() == prov.PROV_DERIVATION:
new_rel = new_bundle.wasDerivedFrom(nfirst,nsecond)
elif rel.get_type() == prov.PROV_DELEGATION:
new_rel = new_bundle.actedOnBehalfOf(nfirst,nsecond)
elif rel.get_type() == prov.PROV_GENERATION:
new_rel = new_bundle.wasGeneratedBy(nfirst,nsecond)
elif rel.get_type() == prov.PROV_INFLUENCE:
new_rel = new_bundle.wasInfluencedBy(nfirst,nsecond)
elif rel.get_type() == prov.PROV_COMMUNICATION:
new_rel = new_bundle.wasInformedBy(nfirst,nsecond)
else:
print("Warning! This relation is not yet supported. typeinfo: ",rel.get_type() )
print(new_rel)
return new_doc
new = instantiate_template(doc1,instance_dict)
print(doc1.get_provn())
#print(new)
print(new.get_provn())
In [102]:
new.get_type() == prov.PROV_ATTRIBUTION
print(new.FORMAL_ATTRIBUTES)
print(new.label)
print(new.value)
print(new.args)
print(new.identifier)
In [ ]:
def gen_graph_model(prov_doc):
node_map = {}
count = [0, 0, 0, 0] # counters for node ids
records = prov_doc.get_records()
relations = []
use_labels = True
show_relation_attributes = True
other_attributes = True
show_nary = True
def _add_node(record):
count[0] += 1
node_id = 'n%d' % count[0]
if use_labels:
if record.label == record.identifier:
node_label = '"%s"' % six.text_type(record.label)
else:
# Fancier label if both are different. The label will be
# the main node text, whereas the identifier will be a
# kind of suptitle.
node_label = six.text_type(record.label)+','+six.text_type(record.identifier)
else:
node_label = six.text_type(record.identifier)
uri = record.identifier.uri
node = Node(node_id, label=node_label, URL=uri)
node_map[uri] = node
## create Node ... ##dot.add_node(node)
return node
def _add_generic_node(qname):
count[0] += 1
node_id = 'n%d' % count[0]
node_label = '"%s"' % six.text_type(qname)
uri = qname.uri
node = Node(node_id, label=node_label, URL=uri)
node_map[uri] = node
return node
def _get_node(qname):
if qname is None:
print "ERROR: _get_node called for empty node"
#return _get_bnode()
uri = qname.uri
if uri not in node_map:
_add_generic_node(qname)
return node_map[uri]
for rec in records:
if rec.is_element():
_add_node(rec)
else:
# Saving the relations for later processing
relations.append(rec)
neo_rels = []
for rec in relations:
args = rec.args
# skipping empty records
if not args:
continue
# picking element nodes
nodes = [
value for attr_name, value in rec.formal_attributes
if attr_name in PROV_ATTRIBUTE_QNAMES
]
other_attributes = [
(attr_name, value) for attr_name, value in rec.attributes
if attr_name not in PROV_ATTRIBUTE_QNAMES
]
add_attribute_annotation = (
show_relation_attributes and other_attributes
)
add_nary_elements = len(nodes) > 2 and show_nary
if len(nodes) < 2: # too few elements for a relation?
continue # cannot draw this
if add_nary_elements or add_attribute_annotation:
# a blank node for n-ary relations or the attribute annotation
# the first segment
rel = Relationship(_get_node(nodes[0]), rec.get_type()._str,_get_node(nodes[1]))
#print "relationship: ",rel
neo_rels.append(rel)
if add_nary_elements:
for node in nodes[2:]:
if node is not None:
relx = Relationship(_get_node(nodes[0]), "...rel_name",_get_node(node))
neo_rels.append(relx)
else:
# show a simple binary relations with no annotation
rel = Relationship(_get_node(nodes[0]), rec.get_type()._str,_get_node(nodes[1]))
neo_rels.append(rel)
return neo_rels