In [1]:
from pyspark import SparkConf
from pyspark import SparkContext
In [2]:
conf = SparkConf()
conf.setMaster('spark://ip-172-31-9-200:7077')
conf.setAppName('spark_analytics_chpt_2')
conf.set("spark.executor.memory", "10g")
sc = SparkContext(conf=conf)
In [3]:
rawblocks = sc.textFile('linkage/')
In [4]:
rawblocks.first()
Out[4]:
In [5]:
rawblocks.take(10)
Out[5]:
In [6]:
rawblocks.count()
Out[6]:
In [7]:
noheader = rawblocks.filter(lambda line: 'id_1' not in line)
In [8]:
noheader.first()
Out[8]:
In [9]:
from itertools import chain
def toBool(s):
if 'TRUE' in s:
return True
if 'FALSE' in s:
return False
def toFloat(s):
if '?' in s:
return float('nan')
else:
return float(s)
def parse(line):
pieces = line.split(',')
id1 = int(pieces[0])
id2 = int(pieces[1])
scores = [toFloat(x) for x in pieces[2:11]]
matched = toBool(pieces[11])
merged = [[id1], [id2], scores, [matched]]
return list(chain(*merged))
In [10]:
md = noheader.map(parse)
In [11]:
md.take(10)
Out[11]:
In [12]:
md.map(lambda x: x[11]).countByValue()
Out[12]:
In [13]:
import math
md.map(lambda x: x[2]).filter(lambda x: not math.isnan(x)).stats()
Out[13]:
In [14]:
md.map(lambda x: x[3]).filter(lambda x: not math.isnan(x)).stats()
Out[14]:
In [15]:
md.map(lambda x: x[10]).filter(lambda x: not math.isnan(x)).stats()
Out[15]: