Install jupyter

pip install jupyter

Jupyter spark enviroment.

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
pyspark --master local[2] --packages graphframes:graphframes:0.3.0-spark1.6-s_2.10

Get file list


In [1]:
import requests
import lxml.html as lh

gdelt_base_url = 'http://data.gdeltproject.org/events/'

page = requests.get(gdelt_base_url+'index.html')
doc = lh.fromstring(page.content)
link_list = doc.xpath("//*/ul/li/a/@href")

file_list = [x for x in link_list if str.isdigit(x[0:4])]

In [2]:
import datetime
datetime.datetime.now().strftime("%Y%m%d")
from datetime import datetime, timedelta

today = datetime.today()
tmpfiles = []
while (1):
    if len(tmpfiles) == 6:
        break
    date_string = today.strftime("%Y%m%d") + ".export.CSV.zip"
    if date_string in file_list:
        tmpfiles.append(date_string)
    today -= timedelta(days=1)

In [3]:
import os.path
import urllib
import zipfile

if not os.path.isdir('data'):
    os.mkdir('data')

rdd = None
for download_file in tmpfiles:
    zipfilename = './data/' + download_file
    while not os.path.isfile(zipfilename):
        urllib.urlretrieve(url=gdelt_base_url + download_file,
                           filename=zipfilename)
    zf = zipfile.ZipFile(file=zipfilename, mode='r')
    for info in zf.infolist():
        data = zf.read(info.filename)
        tmprdd = sc.parallelize(data.split('\n')).map(lambda line: line.split('\t'))
        if rdd:
            rdd = sc.union([rdd, tmprdd])
        else:
            rdd = tmprdd
    zf.close()

In [4]:
import graphframes
import random
data = rdd.filter(lambda line: len(line) == 58)\
        .filter(lambda line: line[7] != "" and line[7] != None)\
        .filter(lambda line: line[17] != "" and line[17] != None)\
        .filter(lambda line: line[28] == "19")\
        .map(lambda line: (line[7], line[17]))
data.cache()
print "data cnt: ", data.count()
keys = data.flatMap(lambda x: (x[0], x[1])).distinct()
keylist = keys.collect()
sqlContext = SQLContext(sc)
vertices = keys.map(lambda x: (x,)).toDF(["id"])
edge = data.map(lambda x: (x[0], x[1])).toDF(["src", "dst"])
g = graphframes.GraphFrame(vertices, edge)


data cnt:  16981

In [ ]:
results = g.pageRank(resetProbability=0.0001, maxIter=20)

In [ ]:
results.vertices.sort('pagerank', ascending=False).select("id", "pagerank").show()

In [ ]: