In [1]:
import requests
import lxml.html as lh
gdelt_base_url = 'http://data.gdeltproject.org/events/'
page = requests.get(gdelt_base_url+'index.html')
doc = lh.fromstring(page.content)
link_list = doc.xpath("//*/ul/li/a/@href")
file_list = [x for x in link_list if str.isdigit(x[0:4])]
In [2]:
import datetime
datetime.datetime.now().strftime("%Y%m%d")
from datetime import datetime, timedelta
today = datetime.today()
tmpfiles = []
while (1):
if len(tmpfiles) == 6:
break
date_string = today.strftime("%Y%m%d") + ".export.CSV.zip"
if date_string in file_list:
tmpfiles.append(date_string)
today -= timedelta(days=1)
In [3]:
import os.path
import urllib
import zipfile
if not os.path.isdir('data'):
os.mkdir('data')
rdd = None
for download_file in tmpfiles:
zipfilename = './data/' + download_file
while not os.path.isfile(zipfilename):
urllib.urlretrieve(url=gdelt_base_url + download_file,
filename=zipfilename)
zf = zipfile.ZipFile(file=zipfilename, mode='r')
for info in zf.infolist():
data = zf.read(info.filename)
tmprdd = sc.parallelize(data.split('\n')).map(lambda line: line.split('\t'))
if rdd:
rdd = sc.union([rdd, tmprdd])
else:
rdd = tmprdd
zf.close()
In [4]:
import graphframes
import random
data = rdd.filter(lambda line: len(line) == 58)\
.filter(lambda line: line[7] != "" and line[7] != None)\
.filter(lambda line: line[17] != "" and line[17] != None)\
.filter(lambda line: line[28] == "19")\
.map(lambda line: (line[7], line[17]))
data.cache()
print "data cnt: ", data.count()
keys = data.flatMap(lambda x: (x[0], x[1])).distinct()
keylist = keys.collect()
sqlContext = SQLContext(sc)
vertices = keys.map(lambda x: (x,)).toDF(["id"])
edge = data.map(lambda x: (x[0], x[1])).toDF(["src", "dst"])
g = graphframes.GraphFrame(vertices, edge)
In [ ]:
results = g.pageRank(resetProbability=0.0001, maxIter=20)
In [ ]:
results.vertices.sort('pagerank', ascending=False).select("id", "pagerank").show()
Graphframes Documents: http://graphframes.github.io/user-guide.html#pagerank
Spark graphframes package: https://spark-packages.org/package/graphframes/graphframes
In [ ]: