In [19]:
from graphframes import *
from pyspark.sql import functions as F
from pyspark import Row
#regular Python packs
import os
import re
import xml.etree.ElementTree as ET
import networkx as nx
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
#paths
PATH = "/home/svanhmic/workspace/Python/Erhvervs/data/regnskabsdata/tax/XBRL20130401/20130401/fsa"
In [2]:
files = os.listdir(PATH)
print(files)
In [3]:
rdd = sc.textFile(name=PATH+"/"+files[13])
In [4]:
rdd.take(10)
Out[4]:
In [5]:
cleanRdd = rdd.map(lambda x: x.split()).filter(lambda x: x != [])
In [6]:
def cleanupRdd(rdd,string):
return (rdd
.filter(lambda x: x[0]== string)
.map(lambda x: [re.sub(r"\w+:|<\w+:|/>",repl="",string=i) for i in x[1:]])
.map(lambda x: [re.sub(r'"',repl="",string=i) for i in x])
.map(lambda x: dict(element.split('=') for element in x if "=" in element))
.map(lambda x: dict([(k,v) for k,v in x.items() if k in ["href","label","type"]]))
#.map(lambda x: len(x))
.map(lambda x: Row(**x))
.toDF()
)
In [7]:
vert = cleanupRdd(cleanRdd,"<link:loc")
vert.count()
Out[7]:
In [8]:
vert.show(vert.count())#.take(10)
In [9]:
edges = (cleanRdd
.filter(lambda x: x[0] == "<link:definitionArc")
.map(lambda x: [re.sub(r"\w+:|<\w+:|/>",repl="",string=i) for i in x[1:]])
.map(lambda x: [re.sub(r'"',repl="",string=i) for i in x])
.map(lambda x: dict(element.split('=') for element in x if "=" in element))
.map(lambda x: dict([(k,v)for k,v in x.items() if k in ["from","to","order","type","arcrole"]]))
#.map(lambda x: len(x))
.map(lambda x: Row(**x))
.toDF()
)
d = edges.count()
print(d)
edges.orderBy("from","order").show(d)
In [10]:
e = (edges
.withColumnRenamed(existing="from",new="src")
.withColumnRenamed(existing="to",new="dst")
)
v = (vert.withColumnRenamed(existing="label",new="id"))
In [11]:
graph = GraphFrame(v=v,e=e)
In [12]:
graph.outDegrees.orderBy(F.col("outDegree").desc()).show(truncate=False)
In [13]:
iDdf = graph.vertices.select("id")
iDdf.groupBy("id").count().show()
In [21]:
graph.edges.filter(F.col("src")=="Assets")
Out[21]:
In [ ]:
G = nx.from_pandas_dataframe(graph.edges.toPandas(),"src","dst")
In [24]:
nx.draw(G)
plt.show()
In [25]:
nx.draw_random(G)
plt.show()
In [254]:
nx.draw_spectral(G)
plt.show()
In [20]:
r = np.random.RandomState(seed=5)
ints = r.random_integers(1, 10, size=(3,2))
a = ['A', 'B', 'C']
b = ['D', 'A', 'E']
df = pd.DataFrame(ints, columns=['weight', 'cost'])
df[0] = a
df['b'] = b
df
G=nx.from_pandas_dataframe(df, 0, 'b', ['weight', 'cost'])
nx.draw(G)
plt.show()
In [ ]:
import networkx as nx
def hierarchy_pos(G, root, width=1., vert_gap = 0.2, vert_loc = 0, xcenter = 0.5 ):
'''If there is a cycle that is reachable from root, then result will not be a hierarchy.
G: the graph
root: the root node of current branch
width: horizontal space allocated for this branch - avoids overlap with other branches
vert_gap: gap between levels of hierarchy
vert_loc: vertical location of root
xcenter: horizontal location of root
'''
def h_recur(G, root, width=1., vert_gap = 0.2, vert_loc = 0, xcenter = 0.5,
pos = None, parent = None, parsed = [] ):
if(root not in parsed):
parsed.append(root)
if pos == None:
pos = {root:(xcenter,vert_loc)}
else:
pos[root] = (xcenter, vert_loc)
neighbors = G.neighbors(root)
if parent != None:
neighbors.remove(parent)
if len(neighbors)!=0:
dx = width/len(neighbors)
nextx = xcenter - width/2 - dx/2
for neighbor in neighbors:
nextx += dx
pos = h_recur(G,neighbor, width = dx, vert_gap = vert_gap,
vert_loc = vert_loc-vert_gap, xcenter=nextx, pos=pos,
parent = root, parsed = parsed)
return pos
return h_recur(G, root, width=1., vert_gap = 0.2, vert_loc = 0, xcenter = 0.5)
In [ ]: