Package Install

$ sudo apt-get install python2.7-dev
$ sudo easy_install pip
$ pip install jupyter
$ pip install pandas
$ pip install google-api-python-client

Jupyter spark enviroment.

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
pyspark --master local[2] --packages graphframes:graphframes:0.3.0-spark2.0-s_2.11

In [1]:
import pandas as pd

projectId = 'spark-151209'
query = """SELECT Actor1CountryCode, Actor2CountryCode, EventRootCode FROM [gdelt-bq:full.events] 
WHERE Actor1CountryCode != ""
AND Actor2CountryCode != ""
AND EventRootCode = '19'
LIMIT 100000"""
df = pd.read_gbq(query, projectId)


Requesting query... ok.
Query running...
Query done.
Cache hit.

Retrieving results...
Got 100000 rows.

Total time taken 6.56 s.
Finished at 2017-01-12 09:16:12.

In [2]:
sqlctx = SQLContext(sc)
sdf = sqlctx.createDataFrame(df)
sdf.show()


+-----------------+-----------------+-------------+
|Actor1CountryCode|Actor2CountryCode|EventRootCode|
+-----------------+-----------------+-------------+
|              USA|              USA|           19|
|              USA|              NZL|           19|
|              USA|              USA|           19|
|              CAN|              POL|           19|
|              USA|              USA|           19|
|              USA|              USA|           19|
|              ZAF|              ZAF|           19|
|              GBR|              USA|           19|
|              USA|              USA|           19|
|              USA|              USA|           19|
|              USA|              USA|           19|
|              BGD|              BGD|           19|
|              USA|              USA|           19|
|              NZL|              NZL|           19|
|              UKR|              POL|           19|
|              SAU|              ARE|           19|
|              SAU|              ARE|           19|
|              AFG|              IRN|           19|
|              AFG|              AFG|           19|
|              ITA|              USA|           19|
+-----------------+-----------------+-------------+
only showing top 20 rows


In [3]:
import graphframes

In [4]:
keys = sdf.rdd.flatMap(lambda x: (x[0], x[1])).distinct()
keylist = keys.collect()
vertices = keys.map(lambda x: (x,)).toDF(["id"])
edge = sdf.rdd.map(lambda x: (x[0], x[1], x[0] + ":" +x[1])).toDF(["src", "dst", "relation"])

In [5]:
g = graphframes.GraphFrame(vertices, edge)
results = g.pageRank(resetProbability=0.15, tol=0.01)

In [6]:
text = results.vertices.sort('pagerank', ascending=False).select("id", "pagerank").show()


+---+------------------+
| id|          pagerank|
+---+------------------+
|USA|31.021184071302333|
|SYR| 8.286111156778835|
|GBR| 6.005561114899255|
|AFG| 5.162184088030115|
|RUS| 3.934112277405712|
|IRQ| 3.879846182250965|
|ISR|3.8232246351402797|
|FRA|3.5474286759348614|
|TUR|3.3192346271949136|
|DEU|3.1362004218725374|
|PAK| 3.021622028295468|
|AFR|2.7670425768775684|
|AUS| 2.740062004184414|
|CHN|2.7033085585495398|
|IRN| 2.332781842266423|
|NGA| 2.202476105742932|
|JPN| 2.175917593090732|
|SAU|  2.11733321954107|
|PSE|2.1140862538489116|
|CAN|1.9687376455852512|
+---+------------------+
only showing top 20 rows

Out[6]:
NoneType

In [ ]: