Example: Use pyspark to process GDELT event data

GDELT: Global Database of Events, Language, and Tone

http://www.gdeltproject.org/

Column Header: http://gdeltproject.org/data/lookups/CSV.header.historical.txt

CountryCode: http://gdeltproject.org/data/lookups/CAMEO.country.txt

More doc: http://gdeltproject.org/data.html#rawdatafiles

Prepare pyspark environment


In [1]:
import findspark
import os
findspark.init('/home/ubuntu/shortcourse/spark-1.5.1-bin-hadoop2.6')

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("pyspark-example").setMaster("local[2]")
sc = SparkContext(conf=conf)

First start by seeing that there does exist a SparkContext object in the sc variable:


In [2]:
print sc


<pyspark.context.SparkContext object at 0xaf8952ac>

Now let's load an RDD with some interesting data. We have the GDELT event data set on our VM as a tab-delimited text file. (Due to VM storage and compute power limitation, we only choose year 2001.)


In [3]:
raw_events = sc.textFile('/home/ubuntu/shortcourse/data/gdelt').map(lambda x: x.split('\t'))
# raw_events.cache()

Let's see what an object in the RDD looks like


In [4]:
print raw_events.first()


[u'35150069', u'20010101', u'200101', u'2001', u'2001.0027', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'', u'AFG', u'AFGHAN', u'AFG', u'', u'', u'', u'', u'', u'', u'', u'0', u'020', u'020', u'02', u'1', u'3', u'9', u'1', u'9', u'5.81545619023003', u'', u'', u'', u'', u'0', u'0', u'0', u'4', u'Kabul, Kabol, Afghanistan', u'AF', u'AF13', u'34.5167', u'69.1833', u'-3378435', u'4', u'Kabul, Kabol, Afghanistan', u'AF', u'AF13', u'34.5167', u'69.1833', u'-3378435', u'20130203']

Let's count the number of events we have. It's an action on the RDD.


In [5]:
print raw_events.count()


4995943

We see that we have about 5 million events at our disposal. The GDELT event data set collects geopolitical events that occur around the world. Each event is tagged with a Goldstein scale value that measures the potential for the event to destabilize the country. Let's compute and plot a histogram of the Goldstein scale values across all the events in the database. The Goldstein scale value is present in the 31st field. First, let's make sure that plotting images are set to be displayed inline (see the IPython docs):


In [6]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np

Now we'll just confirm that all the Goldstein values are indeed between -10 and 10.


In [7]:
m = raw_events.map(lambda x: float(x[30])).min()
M = raw_events.map(lambda x: float(x[30])).max()
print "Min is %f\nMax is %f" % (m, M)


Min is -10.000000
Max is 10.000000

Here we compute the histogram. This is slightly hacky.


In [8]:
bins = np.arange(-10, 11)
one_hot = raw_events.map(lambda x: float(x[30])).map(lambda x: np.histogram([x], bins=bins)[0])
hist = one_hot.reduce(lambda x, y: x + y)
print hist


[361427  43191  42173  74599   7369 327620 143987      0 390206 104754
 536623 848223 386263 549724 405942 121053  86844 444520  93687  27738]

In [9]:
fig = plt.figure()
ax = fig.add_subplot(111)
t = ax.bar(bins[:-1], hist)


We can also plot the number of events each day for the 5 countries that have the most events in the second half of year 2001.

First we can see the number of unique countries that are available. Note that we filter out events that don't list a country code.


In [10]:
raw_events.filter(lambda x: x[7] != '').map(lambda x: x[7]).distinct().collect()


Out[10]:
[u'MCO',
 u'STP',
 u'TON',
 u'BOL',
 u'CHN',
 u'ARE',
 u'ZWE',
 u'KWT',
 u'MEA',
 u'NPL',
 u'ARG',
 u'LIE',
 u'MDV',
 u'TUN',
 u'RWA',
 u'MAR',
 u'MOZ',
 u'GMB',
 u'DEU',
 u'PSE',
 u'SCN',
 u'IRQ',
 u'KAZ',
 u'DZA',
 u'SAM',
 u'TJK',
 u'UZB',
 u'CRB',
 u'NER',
 u'PNG',
 u'HKG',
 u'LAM',
 u'TGO',
 u'ITA',
 u'MUS',
 u'BDI',
 u'HTI',
 u'LVA',
 u'TCD',
 u'AFR',
 u'LAO',
 u'KOR',
 u'PGS',
 u'ARM',
 u'SOM',
 u'PER',
 u'NOR',
 u'LSO',
 u'ROM',
 u'BLZ',
 u'IDN',
 u'SDN',
 u'SWE',
 u'CYP',
 u'HRV',
 u'FSM',
 u'SAF',
 u'SLE',
 u'MNG',
 u'LUX',
 u'MYS',
 u'POL',
 u'TZA',
 u'AUS',
 u'SHN',
 u'SGP',
 u'SYR',
 u'LBN',
 u'PLW',
 u'GUY',
 u'IRL',
 u'ASA',
 u'USA',
 u'ATG',
 u'JAM',
 u'CRI',
 u'AUT',
 u'EUR',
 u'AND',
 u'BGD',
 u'VUT',
 u'CPV',
 u'IRN',
 u'SLB',
 u'PHL',
 u'JPN',
 u'SMR',
 u'BLR',
 u'LKA',
 u'SWZ',
 u'SUR',
 u'PAK',
 u'TMP',
 u'SRB',
 u'BEN',
 u'CAS',
 u'NAF',
 u'MAC',
 u'TTO',
 u'VCT',
 u'GNQ',
 u'MMR',
 u'BRN',
 u'NLD',
 u'KNA',
 u'URY',
 u'BEL',
 u'UGA',
 u'DNK',
 u'PRK',
 u'IND',
 u'PAN',
 u'ISR',
 u'NGA',
 u'MHL',
 u'HUN',
 u'FRA',
 u'SAS',
 u'BHS',
 u'GEO',
 u'OMN',
 u'PRT',
 u'CIV',
 u'GIN',
 u'BHR',
 u'CZE',
 u'MWI',
 u'CAU',
 u'FJI',
 u'SEA',
 u'DMA',
 u'TKM',
 u'GTM',
 u'MEX',
 u'KEN',
 u'NMR',
 u'LBR',
 u'SVK',
 u'NIC',
 u'MDA',
 u'MLI',
 u'KHM',
 u'SEN',
 u'BTN',
 u'GAB',
 u'EGY',
 u'VNM',
 u'SAU',
 u'VEN',
 u'AIA',
 u'NRU',
 u'QAT',
 u'AZE',
 u'AFG',
 u'MDG',
 u'NAM',
 u'COD',
 u'LCA',
 u'UKR',
 u'RUS',
 u'ALB',
 u'WSM',
 u'BGR',
 u'COG',
 u'ETH',
 u'VAT',
 u'EST',
 u'CHE',
 u'WAF',
 u'CAN',
 u'BRB',
 u'NZL',
 u'LTU',
 u'LBY',
 u'TUR',
 u'GNB',
 u'GBR',
 u'THA',
 u'BRA',
 u'CUB',
 u'DOM',
 u'JOR',
 u'MRT',
 u'PRY',
 u'DJI',
 u'COK',
 u'FIN',
 u'WLF',
 u'ESP',
 u'BMU',
 u'CMR',
 u'BWA',
 u'ISL',
 u'ERI',
 u'AGO',
 u'BFA',
 u'ZMB',
 u'TUV',
 u'GRD',
 u'COM',
 u'HND',
 u'KIR',
 u'KGZ',
 u'EAF',
 u'ABW',
 u'COL',
 u'SYC',
 u'YEM',
 u'ZAF',
 u'SLV',
 u'CHL',
 u'ECU',
 u'MLT',
 u'CYM',
 u'WST',
 u'MKD',
 u'CAF',
 u'GHA',
 u'GRC']

Here we convert each event into counts. We will aggregate by country and day, for all events in the second half of 2001.


In [11]:
events_second_half = raw_events.filter(lambda x: x[7] != '').filter(lambda x: x[1] >= '20010601').filter(lambda x: x[1] <= '20011231')
# events_second_half.cache()

Count how many qualified events we have.


In [12]:
events_second_half.count()


Out[12]:
1852211

Transform the events into key-value pair, key is (countrycode, date), value is event count.


In [13]:
country_day_counts = events_second_half.map(lambda x: ((x[7], x[1]), 1)).reduceByKey(lambda x, y: x + y)
# country_day_counts.cache()

Show the first five.


In [14]:
country_day_counts.take(5)


Out[14]:
[((u'CYP', u'20011218'), 21),
 ((u'DZA', u'20010812'), 18),
 ((u'PGS', u'20010712'), 1),
 ((u'QAT', u'20010602'), 11),
 ((u'LTU', u'20010726'), 57)]

In [15]:
from dateutil.parser import parse as parse_date
epoch = parse_date('20010101')
def td2s(td):
    return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 1000000) / 1e6
def day2unix(day):
    return td2s(parse_date(day) - epoch)

Transform the country_day_counts to (country, time, counts), where time and counts can be later used for drawing.


In [16]:
country_series = country_day_counts \
    .map(lambda x: (x[0][0], (day2unix(x[0][1]), x[1]))) \
    .groupByKey() \
    .map(lambda x: (x[0], sorted(x[1], key=lambda y: y[0]))) \
    .map(lambda x: (x[0], zip(*x[1]))) \
    .map(lambda x: (x[0], x[1][0], x[1][1]))

In [17]:
country_series.first()


Out[17]:
(u'MCO',
 (13046400.0,
  13305600.0,
  13392000.0,
  13478400.0,
  13564800.0,
  13651200.0,
  13996800.0,
  14083200.0,
  14169600.0,
  14256000.0,
  14515200.0,
  14601600.0,
  14688000.0,
  14774400.0,
  15120000.0,
  15552000.0,
  15638400.0,
  15724800.0,
  15811200.0,
  15897600.0,
  15984000.0,
  16070400.0,
  16156800.0,
  16416000.0,
  16502400.0,
  16588800.0,
  16675200.0,
  16934400.0,
  17193600.0,
  17366400.0,
  17452800.0,
  17539200.0,
  17625600.0,
  17712000.0,
  18057600.0,
  18144000.0,
  18748800.0,
  18921600.0,
  19008000.0,
  19094400.0,
  19526400.0,
  19699200.0,
  19785600.0,
  20131200.0,
  20217600.0,
  20304000.0,
  20563200.0,
  20995200.0,
  21081600.0,
  21168000.0,
  21254400.0,
  21427200.0,
  21513600.0,
  21859200.0,
  21945600.0,
  22118400.0,
  22291200.0,
  22377600.0,
  22636800.0,
  22982400.0,
  23068800.0,
  23328000.0,
  23414400.0,
  23587200.0,
  23673600.0,
  23760000.0,
  23846400.0,
  24192000.0,
  24278400.0,
  24364800.0,
  24451200.0,
  24537600.0,
  24883200.0,
  24969600.0,
  25056000.0,
  25488000.0,
  25574400.0,
  25660800.0,
  25747200.0,
  25833600.0,
  25920000.0,
  26006400.0,
  26092800.0,
  26265600.0,
  26352000.0,
  26438400.0,
  26611200.0,
  26697600.0,
  26870400.0,
  26956800.0,
  27216000.0,
  27302400.0,
  27388800.0,
  27561600.0,
  27648000.0,
  27907200.0,
  28252800.0,
  28339200.0,
  28598400.0,
  28684800.0,
  28771200.0,
  28857600.0,
  29030400.0,
  29116800.0,
  29203200.0,
  29289600.0,
  29376000.0,
  29548800.0,
  29721600.0,
  29808000.0,
  29980800.0,
  30067200.0,
  30240000.0,
  30412800.0,
  30499200.0,
  30585600.0,
  30672000.0,
  30931200.0,
  31104000.0,
  31190400.0),
 (1,
  2,
  1,
  1,
  9,
  2,
  4,
  1,
  4,
  10,
  2,
  4,
  2,
  4,
  9,
  4,
  3,
  3,
  1,
  7,
  4,
  3,
  1,
  11,
  2,
  7,
  9,
  1,
  2,
  1,
  1,
  2,
  4,
  4,
  2,
  3,
  2,
  2,
  3,
  4,
  3,
  1,
  1,
  7,
  3,
  3,
  7,
  5,
  10,
  6,
  12,
  1,
  1,
  5,
  3,
  1,
  1,
  1,
  6,
  2,
  2,
  2,
  1,
  1,
  2,
  2,
  4,
  2,
  8,
  1,
  2,
  3,
  2,
  2,
  4,
  2,
  10,
  5,
  1,
  2,
  1,
  7,
  3,
  4,
  1,
  3,
  1,
  4,
  7,
  8,
  4,
  1,
  2,
  3,
  1,
  1,
  6,
  2,
  1,
  2,
  2,
  1,
  5,
  3,
  1,
  6,
  3,
  3,
  2,
  2,
  2,
  2,
  2,
  7,
  8,
  4,
  1,
  1,
  5,
  16))

In [18]:
random_color = lambda: '#%02x%02x%02x' % tuple(np.random.randint(0,256,3))
fig = plt.figure()
ax = fig.add_subplot(111)
for (country, times, events) in country_series.takeOrdered(10, lambda x: -sum(x[2])):
    t = ax.plot(times, events, lw=1, c=random_color())


What's the big spike for the line above?


In [19]:
country_day_counts.reduce(lambda x, y: max(x, y, key=lambda z: z[1]))


Out[19]:
((u'USA', u'20010912'), 2387)

Looks like it was the day after September 11th.


In [20]:
# stop the spark context
sc.stop()

In [ ]: