In [1]:
from pyspark import SparkConf
from pyspark import SparkContext

In [2]:
conf = SparkConf()
conf.setMaster('spark://ip-172-31-9-200:7077')
conf.setAppName('spark_analytics_chpt_2')
conf.set("spark.executor.memory", "10g")
sc = SparkContext(conf=conf)

In [3]:
rawblocks = sc.textFile('linkage/')

In [4]:
rawblocks.first()


Out[4]:
u'"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"'

In [5]:
rawblocks.take(10)


Out[5]:
[u'"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"',
 u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE',
 u'39086,47614,1,?,1,?,1,1,1,1,1,TRUE',
 u'70031,70237,1,?,1,?,1,1,1,1,1,TRUE',
 u'84795,97439,1,?,1,?,1,1,1,1,1,TRUE',
 u'36950,42116,1,?,1,1,1,1,1,1,1,TRUE',
 u'42413,48491,1,?,1,?,1,1,1,1,1,TRUE',
 u'25965,64753,1,?,1,?,1,1,1,1,1,TRUE',
 u'49451,90407,1,?,1,?,1,1,1,1,0,TRUE',
 u'39932,40902,1,?,1,?,1,1,1,1,1,TRUE']

In [6]:
rawblocks.count()


Out[6]:
5749142

In [7]:
noheader = rawblocks.filter(lambda line: 'id_1' not in line)

In [8]:
noheader.first()


Out[8]:
u'37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE'

In [9]:
from itertools import chain

def toBool(s):
    if 'TRUE' in s:
        return True
    if 'FALSE' in s:
        return False

def toFloat(s):
    if '?' in s:
        return float('nan')
    else:
        return float(s)

def parse(line):
    pieces = line.split(',')
    id1 = int(pieces[0])
    id2 = int(pieces[1])
    scores = [toFloat(x) for x in pieces[2:11]]
    matched = toBool(pieces[11])
    merged = [[id1], [id2], scores, [matched]]
    return list(chain(*merged))

In [10]:
md = noheader.map(parse)

In [11]:
md.take(10)


Out[11]:
[[37291,
  53113,
  0.833333333333333,
  nan,
  1.0,
  nan,
  1.0,
  1.0,
  1.0,
  1.0,
  0.0,
  True],
 [39086, 47614, 1.0, nan, 1.0, nan, 1.0, 1.0, 1.0, 1.0, 1.0, True],
 [70031, 70237, 1.0, nan, 1.0, nan, 1.0, 1.0, 1.0, 1.0, 1.0, True],
 [84795, 97439, 1.0, nan, 1.0, nan, 1.0, 1.0, 1.0, 1.0, 1.0, True],
 [36950, 42116, 1.0, nan, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, True],
 [42413, 48491, 1.0, nan, 1.0, nan, 1.0, 1.0, 1.0, 1.0, 1.0, True],
 [25965, 64753, 1.0, nan, 1.0, nan, 1.0, 1.0, 1.0, 1.0, 1.0, True],
 [49451, 90407, 1.0, nan, 1.0, nan, 1.0, 1.0, 1.0, 1.0, 0.0, True],
 [39932, 40902, 1.0, nan, 1.0, nan, 1.0, 1.0, 1.0, 1.0, 1.0, True],
 [46626, 47940, 1.0, nan, 1.0, nan, 1.0, 1.0, 1.0, 1.0, 1.0, True]]

In [12]:
md.map(lambda x: x[11]).countByValue()


Out[12]:
defaultdict(int, {False: 5728201, True: 20931})

In [13]:
import math
md.map(lambda x: x[2]).filter(lambda x: not math.isnan(x)).stats()


Out[13]:
(count: 5748125, mean: 0.712902470443, stdev: 0.3887583258, max: 1.0, min: 0.0)

In [14]:
md.map(lambda x: x[3]).filter(lambda x: not math.isnan(x)).stats()


Out[14]:
(count: 103698, mean: 0.90001767189, stdev: 0.271316302365, max: 1.0, min: 0.0)

In [15]:
md.map(lambda x: x[10]).filter(lambda x: not math.isnan(x)).stats()


Out[15]:
(count: 5736289, mean: 0.00552866147434, stdev: 0.074149142791, max: 1.0, min: 0.0)