Example: Use pyspark to process GDELT event data

GDELT: Global Database of Events, Language, and Tone

http://www.gdeltproject.org/

Column Header: http://gdeltproject.org/data/lookups/CSV.header.historical.txt

CountryCode: http://gdeltproject.org/data/lookups/CAMEO.country.txt

More doc: http://gdeltproject.org/data.html#rawdatafiles

Prepare pyspark environment


In [ ]:
import findspark
import os
findspark.init('/home/ubuntu/shortcourse/spark-1.5.1-bin-hadoop2.6')

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("pyspark-example").setMaster("local[2]")
sc = SparkContext(conf=conf)

First start by seeing that there does exist a SparkContext object in the sc variable:


In [ ]:
print sc

Now let's load an RDD with some interesting data. We have the GDELT event data set on our VM as a tab-delimited text file. (Due to VM storage and compute power limitation, we only choose year 2001.)

We use a local file this time, the path is: '/home/ubuntu/shortcourse/data/gdelt'.

Please read the file, and map each line to a single word list.


In [ ]:

Let's see what an object in the RDD looks like.

Take the first element from the created RDD.


In [ ]:

Let's count the number of events we have.


In [ ]:

We should see about 5 million events at our disposal. The GDELT event data set collects geopolitical events that occur around the world. Each event is tagged with a Goldstein scale value that measures the potential for the event to destabilize the country. Let's compute and plot a histogram of the Goldstein scale values across all the events in the database. The Goldstein scale value is present in the 31st field.

First, let's make sure that plotting images are set to be displayed inline (see the IPython docs):


In [ ]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np

Now we'll just confirm that all the Goldstein values are indeed between -10 and 10.

Count and print out the max and min value of the 31st field.


In [ ]:

Here we compute the histogram. And print it out.


In [ ]:

Plot the histogram.


In [ ]:

We can also plot the number of events each day for the 10 countries that have the most events in the second half of year 2001.

First we can see the number of unique countries that are available. Note that we filter out events that don't list a country code.

Show the distict country codes, the 8th field.


In [ ]:

Here we convert each event into counts. Aggregate by country and day, for all events in the second half of 2001.

First, filter the raw events. Keep the events for the second half of 2001. Also filter out events that don't list a country code.


In [ ]:

Count how many qualified events we have.


In [ ]:

Transform the events into key-value pair, key is (countrycode (8th), date (2nd)), value is event count.

((code, date), count)

In [ ]:

Show the first five.


In [ ]:


In [ ]:
# some help function to convert the date to a float value indicates the time within the year in seconds.

from dateutil.parser import parse as parse_date
epoch = parse_date('20010101')
def td2s(td):
    return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 1000000) / 1e6
def day2unix(day):
    return td2s(parse_date(day) - epoch)

Aggregate the events by country and transform the country_day_counts to (country, time, counts), where time and counts can be later used for drawing. Note the time and its corresponding count should be sorted according to time.


In [ ]:

Show the first item.


In [ ]:

Plot the figure, x axis is the time and y axis is the event count. Plot for the 10 countries with most events.


In [ ]:

What's the big spike for the line above? Try to see what's going on use reduce and max.


In [ ]:

Looks like it was the day after September 11th.


In [ ]:
# stop the spark context
sc.stop()

In [ ]: