In [21]:
import pandas as pd
import networkx as nx
import pyensae
import pyquickhelper
In [22]:
example = pd.read_csv("data/web-Google-test.txt",sep = "\t", names=['from','to'])
example
Out[22]:
In [23]:
G = nx.from_pandas_dataframe(example, 'from', 'to',create_using=nx.DiGraph())
In [24]:
import matplotlib as mp
%matplotlib inline
In [25]:
import matplotlib.pyplot as plt
nx.draw_networkx(G, node_color = 'lightgreen', node_size = 1000,arrows=True)
plt.savefig('pictures/graph_example.png')
In [26]:
from operator import add
sc = SparkContext.getOrCreate()
In [27]:
diretorio_base = os.path.join('data')
caminho_teste = os.path.join('web-Google-test.txt')
arquivo_teste = os.path.join(diretorio_base, caminho_teste)
In [28]:
def atualizaRank(listaUrls, rank):
num_urls = len(listaUrls)
rankAtualizado = []
for x in listaUrls:
rankAtualizado.append((x, (rank / num_urls)))
return rankAtualizado
In [29]:
#numPartitions = 2
#rawData = sc.textFile(fileName, numPartitions)
linksGoogle_teste = sc.textFile(arquivo_teste).filter(lambda x: "#" not in x).map(lambda x: x.split("\t"))
In [30]:
linksAgrupados_teste = linksGoogle_teste.groupByKey().cache()
#print(linksAgrupados.take(1))
#for it in linksAgrupados.take(1)[0][1]:
# print(it)
In [31]:
ranks_teste = linksAgrupados_teste.map(lambda url_agrupados: (url_agrupados[0], 1.0))
In [32]:
for x in range(1,2):
# Adiciona ranks inicializados com 1.0 na posição [1][1] da matriz
agrupaIdLinkComRank_teste = linksAgrupados_teste .join(ranks_teste)\
.flatMap(lambda url_rank: atualizaRank(url_rank[1][0], url_rank[1][1]))
# Soma os valores com o mesmo id e adiciona o fator de normalização
ranks_teste = agrupaIdLinkComRank_teste.reduceByKey(add)\
.mapValues(lambda rankFatorD: (rankFatorD * 0.85) + 0.15)
In [33]:
for (link, rank) in ranks_teste.sortBy(lambda x:-x[1]).take(3):
print("ID: %s Ranking: %s." % (link, rank))
Após o teste, vamos aplicar o algoritmo de PageRank na base de dados de referências de páginas da web (arquivo web-Google.txt) encontrada no site da Stanford.
In [311]:
diretorio_base = os.path.join('data')
caminho = os.path.join('web-Google.txt')
arquivo = os.path.join(diretorio_base, caminho)
In [312]:
linksGoogle = sc.textFile(arquivo).filter(lambda x: "#" not in x).map(lambda x: x.split("\t"))
In [313]:
linksAgrupados = linksGoogle.groupByKey().cache()
In [314]:
ranks = linksAgrupados.map(lambda url_agrupados: (url_agrupados[0], 1.0))
In [315]:
for x in range(1,8):
# Adiciona ranks inicializados com 1.0 na posição [1][1] da matriz
agrupaIdLinkComRank = linksAgrupados.join(ranks)\
.flatMap(lambda url_rank: atualizaRank(url_rank[1][0], url_rank[1][1]))
# Soma os valores com o mesmo id e adiciona o fator de normalização
ranks = agrupaIdLinkComRank.reduceByKey(add)\
.mapValues(lambda rankFatorD: (rankFatorD * 0.85) + 0.15)
In [316]:
for (link, rank) in ranks.sortBy(lambda x:-x[1]).take(10):
print("ID: %s Ranking: %s." % (link, rank))