GDELT: Global Database of Events, Language, and Tone
Column Header: http://gdeltproject.org/data/lookups/CSV.header.historical.txt
CountryCode: http://gdeltproject.org/data/lookups/CAMEO.country.txt
Prepare pyspark environment
In [1]:
import findspark
import os
findspark.init('/home/ubuntu/shortcourse/spark-1.5.1-bin-hadoop2.6')
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("pyspark-example").setMaster("local[2]")
sc = SparkContext(conf=conf)
First start by seeing that there does exist a SparkContext object in the sc variable:
In [2]:
print sc
Now let's load an RDD with some interesting data. We have the GDELT event data set on our VM as a tab-delimited text file. (Due to VM storage and compute power limitation, we only choose year 2001.)
In [3]:
raw_events = sc.textFile('/home/ubuntu/shortcourse/data/gdelt').map(lambda x: x.split('\t'))
# raw_events.cache()
Let's see what an object in the RDD looks like
In [4]:
print raw_events.first()
Let's count the number of events we have. It's an action on the RDD.
In [5]:
print raw_events.count()
We see that we have about 5 million events at our disposal. The GDELT event data set collects geopolitical events that occur around the world. Each event is tagged with a Goldstein scale value that measures the potential for the event to destabilize the country. Let's compute and plot a histogram of the Goldstein scale values across all the events in the database. The Goldstein scale value is present in the 31st field. First, let's make sure that plotting images are set to be displayed inline (see the IPython docs):
In [6]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
Now we'll just confirm that all the Goldstein values are indeed between -10 and 10.
In [7]:
m = raw_events.map(lambda x: float(x[30])).min()
M = raw_events.map(lambda x: float(x[30])).max()
print "Min is %f\nMax is %f" % (m, M)
Here we compute the histogram. This is slightly hacky.
In [8]:
bins = np.arange(-10, 11)
one_hot = raw_events.map(lambda x: float(x[30])).map(lambda x: np.histogram([x], bins=bins)[0])
hist = one_hot.reduce(lambda x, y: x + y)
print hist
In [9]:
fig = plt.figure()
ax = fig.add_subplot(111)
t = ax.bar(bins[:-1], hist)
We can also plot the number of events each day for the 5 countries that have the most events in the second half of year 2001.
First we can see the number of unique countries that are available. Note that we filter out events that don't list a country code.
In [10]:
raw_events.filter(lambda x: x[7] != '').map(lambda x: x[7]).distinct().collect()
Out[10]:
Here we convert each event into counts. We will aggregate by country and day, for all events in the second half of 2001.
In [11]:
events_second_half = raw_events.filter(lambda x: x[7] != '').filter(lambda x: x[1] >= '20010601').filter(lambda x: x[1] <= '20011231')
# events_second_half.cache()
Count how many qualified events we have.
In [12]:
events_second_half.count()
Out[12]:
Transform the events into key-value pair, key is (countrycode, date), value is event count.
In [13]:
country_day_counts = events_second_half.map(lambda x: ((x[7], x[1]), 1)).reduceByKey(lambda x, y: x + y)
# country_day_counts.cache()
Show the first five.
In [14]:
country_day_counts.take(5)
Out[14]:
In [15]:
from dateutil.parser import parse as parse_date
epoch = parse_date('20010101')
def td2s(td):
return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 1000000) / 1e6
def day2unix(day):
return td2s(parse_date(day) - epoch)
Transform the country_day_counts to (country, time, counts), where time and counts can be later used for drawing.
In [16]:
country_series = country_day_counts \
.map(lambda x: (x[0][0], (day2unix(x[0][1]), x[1]))) \
.groupByKey() \
.map(lambda x: (x[0], sorted(x[1], key=lambda y: y[0]))) \
.map(lambda x: (x[0], zip(*x[1]))) \
.map(lambda x: (x[0], x[1][0], x[1][1]))
In [17]:
country_series.first()
Out[17]:
In [18]:
random_color = lambda: '#%02x%02x%02x' % tuple(np.random.randint(0,256,3))
fig = plt.figure()
ax = fig.add_subplot(111)
for (country, times, events) in country_series.takeOrdered(10, lambda x: -sum(x[2])):
t = ax.plot(times, events, lw=1, c=random_color())
What's the big spike for the line above?
In [19]:
country_day_counts.reduce(lambda x, y: max(x, y, key=lambda z: z[1]))
Out[19]:
Looks like it was the day after September 11th.
In [20]:
# stop the spark context
sc.stop()
In [ ]: