In [1]:
import DSGRN
import time
import sys
class PQNetworkAnalyzer:
def __init__(self, network, P):
self.network = network
self.P_index = network.index(P)
self.parametergraph = DSGRN.ParameterGraph(network)
def AnalyzeParameter(self, parameterindex):
parameter = self.parametergraph.parameter(parameterindex)
dg = DSGRN.DomainGraph(parameter)
md = DSGRN.MorseDecomposition(dg.digraph())
mg = DSGRN.MorseGraph(dg, md)
return mg
def is_FP(self, annotation):
return annotation.startswith("FP")
def is_quiescent_FP(self, annotation):
if self.is_FP(annotation):
digits = [int(s) for s in annotation.replace(",", "").split() if s.isdigit()]
if digits[self.P_index] == 0:
return True
return False
def is_proliferative_FP(self, annotation):
if self.is_FP(annotation):
digits = [int(s) for s in annotation.replace(",", "").split() if s.isdigit()]
if digits[self.P_index] >= 1:
return True
return False
def AnalyzeMorseGraph(self, mg):
mg_poset = mg.poset()
stable_annotations = [ mg.annotation(i)[0] for i in range(0,mg.poset().size()) if len(mg.poset().children(i)) == 0]
monostable = len(stable_annotations) == 1
quiescent = any( self.is_quiescent_FP(annotation) for annotation in stable_annotations )
proliferative = any( self.is_proliferative_FP(annotation) for annotation in stable_annotations )
if monostable and quiescent:
return 'Q'
if monostable and proliferative:
return 'P'
if quiescent and proliferative:
return 'B'
if quiescent:
return 'q'
if proliferative:
return 'p'
return 'O'
def Classify(self, parameterindex):
analyzed_param = self.AnalyzeParameter(parameterindex)
ret_val = self.AnalyzeMorseGraph(analyzed_param)
return ret_val
def topological_sort(graph):
"""
Return list of vertices in (reverse) topologically sorted order
"""
result = []
explored = set()
dfs_stack = [ (v,0) for v in range(0,graph.num_vertices())]
while dfs_stack:
(v,i) = dfs_stack.pop()
if (v,i) in explored: continue
explored.add((v,i))
if i == 0: # preordering visit
dfs_stack.extend([(v,1)] + [ (u,0) for u in graph.unlabelled_adjacencies(v) if u != v ])
elif i == 1: # postordering visit
result.append(v)
return result
def count_paths(graph):
"""
returns card{ (u,v) : source(u) & target(v) & there is an allowed path from u to v}
"""
ts = topological_sort(graph)
paths = {}
result = 0
for v in ts:
paths[v] = sum([ paths[u] for u in graph.unlabelled_adjacencies(v) if u != v]) + ( 1 if v == graph.final() else 0)
if v == graph.initial(): result += paths[v]
return result
class PartialPathAutomata:
def __init__(self, network, S, P):
self.network = network
self.analyzer = PQNetworkAnalyzer(self.network, P)
self.query = DSGRN.NewComputeSingleGeneQuery(network,S,lambda pi : self.analyzer.Classify(pi))
def __call__(self,reduced_parameter_index):
searchautomaton = self.query(reduced_parameter_index)
start = searchautomaton.add_vertex()
stop = searchautomaton.final()
for i in range(0,searchautomaton.num_vertices()):
if i != stop: searchautomaton.add_edge(start,i,' ')
if i != start and i != stop:
label = self.analyzer.Classify(self.query.full_parameter_index(reduced_parameter_index,i))
searchautomaton.add_edge(i,stop,label)
searchautomaton.set_initial(start)
searchautomaton.set_final(stop)
return searchautomaton
class FullPathAutomata:
def __init__(self, network, S, P):
self.network = network
self.analyzer = PQNetworkAnalyzer(self.network, P)
self.query = DSGRN.NewComputeSingleGeneQuery(network,S,lambda pi : self.analyzer.Classify(pi))
def __call__(self,reduced_parameter_index):
searchautomaton = self.query(reduced_parameter_index)
return searchautomaton
class ReducedParameterGraphQuery:
def __init__(self, searchautomata, queryautomaton):
self.searchautomata = searchautomata
self.queryautomaton = queryautomaton
def __call__(self,reduced_parameter_index):
(intersection, proj) = DSGRN.NFA.intersect(self.queryautomaton, self.searchautomata(reduced_parameter_index))
return count_paths(intersection)
def num_paths(self):
return count_paths(self.searchautomata(0))
def WorkJob(network_specification_file, partial_hysteresis_output_file, partial_resettable_output_file,
full_hysteresis_output_file, full_resettable_output_file, starting_rpi, ending_rpi, S, P ):
def RunQueries(query, filename):
start_time = time.time()
result = 0
for rpi in range(starting_rpi, ending_rpi):
number_of_matches = query(rpi)
# DSGRN.LogToSTDOUT("Reduced parameter " + str(rpi) + " has " + str(number_of_matches) + " matches for query " + query.__class__.__name__)
result += number_of_matches
if (rpi - starting_rpi) % 1 == 0:
DSGRN.LogToSTDOUT("Processed from " + str(starting_rpi) + " to " + str(rpi) + " out of " + str(ending_rpi))
normalization = (ending_rpi - starting_rpi)*query.num_paths()
with open(filename, 'w') as outfile:
outfile.write(str(result) + " " + str(normalization) + "\n")
with open(filename + ".log", 'w') as outfile:
outfile.write(str(time.time() - start_time) + '\n')
# Queries to run
network = DSGRN.Network(network_specification_file)
DSGRN.LogToSTDOUT("Parameter graph has " + str(DSGRN.ParameterGraph(network).size()) + " parameters. ")
PPA = PartialPathAutomata(network, S, P)
FPA = FullPathAutomata(network, S, P)
HysA = DSGRN.CompileRegexToNFA("Q(Q|q)*B+(P|p)*P")
ResA = DSGRN.CompileRegexToNFA("Q(Q|q)*B+")
PHQ = ReducedParameterGraphQuery(PPA, HysA)
PRQ = ReducedParameterGraphQuery(PPA, ResA)
FHQ = ReducedParameterGraphQuery(FPA, HysA)
FRQ = ReducedParameterGraphQuery(FPA, ResA)
RunQueries(PHQ, partial_hysteresis_output_file)
RunQueries(PRQ, partial_resettable_output_file)
RunQueries(FHQ, full_hysteresis_output_file)
RunQueries(FRQ, full_resettable_output_file)
In [2]:
%%time
spec = """
X : (X+Y)
Y : (X+Z)
Z : X
"""
net = DSGRN.Network(spec)
N = DSGRN.ParameterGraph(net).size()//len(DSGRN.ParameterGraph(net).factorgraph(0))
WorkJob(spec, "pho.txt","pro.txt","fho.txt","fro.txt",0,N,'X','Z')
In [ ]: