header
e imprimi-la para a descrição dos campos de nossa base.
In [18]:
sc = SparkContext.getOrCreate()
In [19]:
import os
import numpy as np
filename = os.path.join("Data","Aula03","train.csv")
CrimeRDD = sc.textFile(filename,8)
header = CrimeRDD.take(1)[0] # o cabeçalho é a primeira linha do arquivo
print "Campos disponíveis: {}".format(header)
In [20]:
# EXERCICIO
CrimeHeadlessRDD = CrimeRDD.filter(lambda x: x != header)
firstObject = CrimeHeadlessRDD.take(1)[0]
print firstObject
In [21]:
assert firstObject==u'2015-05-13 23:53:00,WARRANTS,WARRANT ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",OAK ST / LAGUNA ST,-122.425891675136,37.7745985956747', 'valor incorreto'
print "OK"
In [22]:
# EXERCICIO
CrimeHeadlessRDD = (CrimeRDD
.filter(lambda x: x != header)
.map(lambda x: x.split(','))
)
firstObjectList = CrimeHeadlessRDD.take(1)[0]
print firstObjectList
In [23]:
assert firstObjectList[0]==u'2015-05-13 23:53:00', 'valores incorretos'
print "OK"
split()
.re
. Vamos utilizar o comando re.split()
para cuidar da separação de nossa base em campos.datetime
através do comando datetime.datetime.strptime()
. Também vamos agrupar as coordenadas X e Y em uma tupla de floats.namedtuple
que permite acessar cada campo de cada objeto pelo nome. Ex.: rec.Dates.
In [24]:
# EXERCICIO
import re
import datetime
from collections import namedtuple
headeritems = header.split(',') # transformar o cabeçalho em lista
del headeritems[-1] # apagar o último item e...
headeritems[-1] = 'COORD' # transformar em COORD
# Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,COORD
Crime = namedtuple('Crime',headeritems) # gera a namedtuple Crime com os campos de header
REGEX = r',(?=(?:[^"]*"[^"]*")*(?![^"]*"))'
# buscar por "," tal que após essa vírgula (?=) ou exista um par de "" ou não tenha " sozinha
# ?= indica para procurarmos pelo padrão após a vírgula
# ?: significa para não interpretar os parênteses como captura de valores
# [^"]* 0 ou sequências de caracteres que não sejam aspas
# [^"]*"[^"]*" <qualquer caracter exceto aspas> " <qualquer caracter exceto aspas> "
# ?! indica para verificar se não existe tal padrão a frente da vírgula
def ParseCrime(rec):
# utilizando re.split() vamos capturar nossos valores
Date, Category, Descript, DayOfWeek, PdDistrict, Resolution, Address, X, Y = tuple(re.split(REGEX,rec))
# Converta a data para o formato datetime
Date = datetime.datetime.strptime(Date, "%Y-%m-%d %H:%M:%S")
# COORD é uma tupla com floats representando X e Y
COORD = (float(X),float(Y))
# O campos 'Resolution' será uma lista dos valores separados por vírgula, sem as aspas
Resolution = map(lambda x: x.strip(), Resolution.split(','))
return Crime(Date, Category, Descript, DayOfWeek, PdDistrict, Resolution, Address, COORD)
# Aplique a função ParseCrime para cada objeto da base
CrimeHeadlessRDD = (CrimeRDD
.filter(lambda x: x != header)
.map(ParseCrime)
)
firstClean = CrimeHeadlessRDD.take(1)[0]
totalRecs = CrimeHeadlessRDD.count()
print firstClean
In [25]:
assert type(firstClean.Dates) is datetime.datetime and type(firstClean.Resolution) is list and type(firstClean.COORD) is tuple,'tipos incorretos'
print "OK"
assert CrimeHeadlessRDD.filter(lambda x: len(x)!=8).count()==0, 'algo deu errado!'
print "OK"
assert totalRecs==878049, 'total de registros incorreto'
print "OK"
In [26]:
# EXERCICIO
CatCountRDD = (CrimeHeadlessRDD
.map(lambda x: (x.Category, 1))
.reduceByKey(lambda x, y : x+y)
)
catCount = sorted(CatCountRDD.collect(), key=lambda x: -x[1])
print catCount
In [27]:
assert catCount[0][1]==174900, 'valores incorretos'
print "OK"
In [28]:
# EXERCICIO
RegionCountRDD = (CrimeHeadlessRDD
.map(lambda x : (x.PdDistrict, 1))
.reduceByKey(lambda x, y : x+y)
)
regCount = sorted(RegionCountRDD.collect(), key=lambda x: -x[1])
print regCount
In [29]:
assert regCount[0][1]==157182, 'valores incorretos'
print "OK"
contagemDiaDaSemana
para float
. Finalmente, o resultado pode ser agrupado pela chave, gerando uma tupla ( DayOfWeek, [ (Pd1, media1), (Pd2, media2), ... ] ). Essa lista pode ser mapeada para um dicionário com o comando dict
.collectAsMap()
.
In [30]:
#DEBUG - Início -
day2str = lambda x: '{}-{}-{}'.format(x.day,x.month,x.year)
#print (CrimeHeadlessRDD.map(lambda x: ((day2str(x.Dates),x.DayOfWeek),1))
# .reduceByKey(lambda x, y : x)).map(lambda x: (x[0][1], x[1])).reduceByKey(lambda x, y: x+y).take(100)
print CrimeHeadlessRDD.map(lambda x : ((x.DayOfWeek, x.PdDistrict),1)).reduceByKey(lambda x, y: x+y).map(lambda x: (x[0][0],(x[0][1],x[1]))).collect()
#DEBUG - Fim -
In [31]:
# EXERCICIO
from operator import add
# Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,COORD
# Lambda para converter um datetime em `Dia-Mes-Ano`
day2str = lambda x: '{}-{}-{}'.format(x.day,x.month,x.year)
totalDatesRDD = (CrimeHeadlessRDD
.map(lambda x: ((day2str(x.Dates),x.DayOfWeek),1))
.reduceByKey(lambda x, y : x)
.map(lambda x: (x[0][1], x[1]))
.reduceByKey(lambda x, y: x+y)
)
crimesWeekDayRegionRDD = (CrimeHeadlessRDD
.map(lambda x : ((x.DayOfWeek, x.PdDistrict),1))
.reduceByKey(lambda x, y: x+y)
.map(lambda x: (x[0][0],(x[0][1],x[1])))
)
RegionAvgPerDayRDD = (crimesWeekDayRegionRDD
.join(totalDatesRDD)
.map(lambda x: (x[0],(x[1][0][0],x[1][0][1]/float(x[1][1]))))
.groupByKey()
.map(lambda x: (x[0],dict(x[1])))
)
RegionAvg = RegionAvgPerDayRDD.collectAsMap()
print RegionAvg['Sunday']
In [32]:
assert np.round(RegionAvg['Sunday']['BAYVIEW'],2)==37.27, 'valores incorretos {}'.format(np.round(RegionAvg[0][2],2))
print "OK"
Keys
. Itere essa variável realizando os seguintes passos:key[0]
não existir no dicionário, crie a entrada key[0]
como um dicionário vazio.key
e gerando a RDD com os valores da tupla. Note que não queremos uma lista de listas.mean()
e stdev()
do PySpark, armazenando na chave RegionAvgSpark[ key[0] ][ key[1] ].
In [34]:
# EXERCICIO
countWeekDayDistRDD = (CrimeHeadlessRDD
.map(lambda x:((day2str(x.Dates),x.DayOfWeek,x.PdDistrict),1))
.reduceByKey(add)
.map(lambda x:((x[0][1],x[0][2]),x[1]))
.groupByKey()
)
# Esse procedimento só é viável se existirem poucas chaves
RegionAvgSpark = {}
Keys = countWeekDayDistRDD.map(lambda rec: rec[0]).collect()
for key in Keys:
listRDD = (countWeekDayDistRDD
.filter(lambda rec: rec[0]==key)
.flatMap(lambda rec: rec[1])
)
if key[0] not in RegionAvgSpark:
RegionAvgSpark[key[0]] = {}
RegionAvgSpark[key[0]][key[1]] = (listRDD.mean(), listRDD.stdev())
print RegionAvgSpark['Sunday']
In [35]:
assert np.round(RegionAvgSpark['Sunday']['BAYVIEW'][0],2)==37.39 and np.round(RegionAvgSpark['Sunday']['BAYVIEW'][1],2)==10.06, 'valores incorretos'
print "OK"
matplotlib
que já vem por padrão na maioria das distribuições do Python (ex.: Anaconda). Outras bibliotecas alternativas interessantes são: Seaborn e Bokeh.count()
armazenando na variável totalDays
. Não se esqueça de converter o valor para float
.zip()
do Python é possível descompactar um dicionário em duas variáveis, uma com as chaves e outra com os valores. Utilizaremos essas variáveis para a plotagem do gráfico.
In [47]:
#Insert a graphic inline
%matplotlib inline
import matplotlib.pyplot as plt
# Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,COORD
# Lambda para converter um datetime em `Dia-Mes-Ano`
day2str = lambda x: '{}-{}-{}'.format(x.day,x.month,x.year)
totalDatesRDD = (CrimeHeadlessRDD
.map(lambda rec: (day2str(rec.Dates),1))
.reduceByKey(lambda x,y: x)
)
totalDays = float(totalDatesRDD.count())
avgCrimesRegionRDD = (RegionCountRDD
.map(lambda rec: (rec[0],rec[1]/totalDays))
)
Xticks,Y = zip(*avgCrimesRegionRDD.collectAsMap().items())
indices = np.arange(len(Xticks))
width = 0.35
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.bar(indices,Y, width)
plt.grid(b=True, which='major', axis='y')
plt.xticks(indices+width/2., Xticks, rotation=17 )
plt.ylabel('Number of crimes')
plt.xlabel('Region')
pass
Y
em que a chave é o dia da semana e o valor é uma np.array
contendo a média de cada região para aquele dia.Bottom
que determina qual é o início de cada uma das barras. O início da barra do dia i
deve ser o final da barra do dia i-1
.Bottom
daquele dia.
In [ ]:
# Dias da semana como referência
Day = ['Sunday','Monday','Tuesday','Wednesday','Thursday','Friday','Saturday']
# Uma cor para cada dia
Color = ['r','b','g','y','c','k','purple']
# Dicionário (dia, array de médias)
Y = {}
for day in Day:
Y[day] = np.array([RegionAvg[day][x] for x in Xticks])
# Matriz dias x regiões
Bottom = np.zeros( (len(Day),len(Xticks)) )
for i in range(1,len(Day)):
Bottom[i,:] = Bottom[i-1,:]+Y[Day[i-1]]
indices = np.arange(len(Xticks))
width = 0.35
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
# Gera uma lista de plots, um para cada dia
plots = [plt.bar(indices,Y[Day[i]], width, color=Color[i], bottom=Bottom[i]) for i in range(len(Day))]
plt.legend( [p[0] for p in plots], Day,loc='center left', bbox_to_anchor=(1, 0.5) )
plt.grid(b=True, which='major', axis='y')
plt.xticks(indices+width/2., Xticks, rotation=17 )
plt.ylabel('Number of crimes')
plt.xlabel('Region')
pass
In [25]:
# EXERCICIO
parseWeekday = lambda x: '{}-{}-{}'.format(x.day, x.month, x.year)
hoursRDD = (CrimeHeadlessRDD
.map(lambda x: ((parseWeekday(x.Dates),x.Dates.hour),1))
.reduceByKey(lambda x,y: x)
.map(lambda x: (x[0][1],1))
.reduceByKey(lambda x, y: x+y)
)
crimePerHourRDD = (CrimeHeadlessRDD
.map(lambda x: (x.Dates.hour,1))
.reduceByKey(lambda x, y: x+y)
)
avgCrimeHourRDD = (crimePerHourRDD
.join(hoursRDD)
.map(lambda x: (x[0], x[1][0]/float(x[1][1])))
)
crimePerHour = avgCrimeHourRDD.collect()
print crimePerHour[0:5]
In [26]:
assert np.round(crimePerHour[0][1],2)==19.96, 'valores incorretos'
print "OK"
In [28]:
crimePerHourSort = sorted(crimePerHour,key=lambda x: x[0])
X,Y = zip(*crimePerHourSort)
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.plot(X,Y)
plt.grid(b=True, which='major', axis='y')
plt.ylabel('Avg. Number of crimes')
plt.xlabel('Hour')
pass
dict()
nessa lista para obtermos uma RDD no seguinte formato: (Mes-Ano, {CRIME: quantidade}).crimes
contendo a lista de crimes contidas na lista de pares catCount
computada anteriormente.
In [73]:
# EXERCICIO
parseMonthYear = lambda x: '{}-{}'.format(x.month, x.year)
crimes = map(lambda x: x[0], catCount)
datesCrimesRDD = (CrimeHeadlessRDD
.<COMPLETAR>
.<COMPLETAR>
.<COMPLETAR>
.<COMPLETAR>
.<COMPLETAR>
.cache()
)
print datesCrimesRDD.take(1)
In [72]:
assert datesCrimesRDD.take(1)[0][1][u'KIDNAPPING']==12,'valores incorretos'
print 'ok'
fractionCrimesDateRDD
em que a chave é Mes-Ano e o valor é uma lista da fração de cada tipo de crime ocorridos naquele mês e ano. Para gerar essa lista vamos utilizar o list comprehension do Python de tal forma a calcular a fração para cada crime na variável crimes
.get()
que permite atribuir um valor padrão caso a chave não exista. Ex.: dicionario.get( chave, 0.0)
retornará 0.0 caso a chave não exista.
In [ ]:
# EXERCICIO
totalPerDateRDD = (CrimeHeadlessRDD
.<COMPLETAR>
.<COMPLETAR>
)
fractionCrimesDateRDD = (datesCrimesRDD
.<COMPLETAR>
.<COMPLETAR>
.cache()
)
print fractionCrimesDateRDD.take(1)
In [ ]:
assert np.abs(fractionCrimesDateRDD.take(1)[0][1][0][1]-0.163950)<1e-6,'valores incorretos'
print 'ok'
Statistics.corr()
da biblioteca pyspark.mlllib.stat
.
In [ ]:
from pyspark.mllib.stat import Statistics
corr = Statistics.corr(fractionCrimesDateRDD.map(lambda rec: map(lambda x: x[1],rec[1])))
print corr
In [ ]:
npCorr = np.array(corr)
rowMin = npCorr.min(axis=1).argmin()
colMin = npCorr[rowMin,:].argmin()
print crimes[rowMin], crimes[colMin], npCorr[rowMin,colMin]
npCorr[npCorr==1.] = 0.
rowMax = npCorr.max(axis=1).argmax()
colMax = npCorr[rowMax,:].argmax()
print crimes[rowMax], crimes[colMax], npCorr[rowMax,colMax]
var1RDD
e var2RDD
. Elas são um mapeamento da fractionCrimesDateRDD
filtradas para conter apenas o crime contido em Xlabel e Ylabel, respectivamente.correlationRDD
que mapeará para tuplas de valores, onde os valores são as médias calculadas em fractionCrimesDateRDD.
In [ ]:
# EXERCICIO
Xlabel = 'FORGERY/COUNTERFEITING'#'DRIVING UNDER THE INFLUENCE'
Ylabel = 'NON-CRIMINAL'#'LIQUOR LAWS'
var1RDD = (fractionCrimesDateRDD
.map(lambda rec: (rec[0], filter(lambda x: x[0]==Xlabel,rec[1])[0][1]))
)
var2RDD = (fractionCrimesDateRDD
.map(lambda rec: (rec[0], filter(lambda x: x[0]==Ylabel,rec[1])[0][1]))
)
correlationRDD = (var1RDD
.<COMPLETAR>
.<COMPLETAR>
)
Data = correlationRDD.collect()
print Data[0]
In [ ]:
assert np.abs(Data[0][0]-0.015904)<1e-6, 'valores incorretos'
print 'ok'
In [ ]:
X,Y = zip(*Data)
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.scatter(X,Y)
plt.grid(b=True, which='major', axis='y')
plt.xlabel(Xlabel)
plt.ylabel(Ylabel)
pass
bookedRDD
que contém apenas os registros contendo ARREST no campo Resolution (lembre-se que esse campo é uma lista) e contabilizar a quantidade de registros em cada 'Mes-Ano'. Ao final, vamos mapear para uma RDD contendo apenas os valores contabilizados.
In [ ]:
# EXERCICIO
bookedRDD = (CrimeHeadlessRDD
.<COMPLETAR>
.<COMPLETAR>
.<COMPLETAR>
.<COMPLETAR>
)
Data = bookedRDD.collect()
print Data[:5]
In [ ]:
assert Data[0]==1914,'valores incorretos'
print 'ok'
In [ ]:
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.hist(Data)
plt.grid(b=True, which='major', axis='y')
plt.xlabel('ARRESTED')
pass
In [ ]:
# EXERCICIO
parseDayMonth = lambda x: '{}-{}'.format(x.month,x.year)
robberyBookedRDD = (CrimeHeadlessRDD
.<COMPLETAR>
.<COMPLETAR>
.<COMPLETAR>
.<COMPLETAR>
)
assaultBookedRDD = (CrimeHeadlessRDD
.<COMPLETAR>
.<COMPLETAR>
.<COMPLETAR>
.<COMPLETAR>
)
robData = robberyBookedRDD.collect()
assData = assaultBookedRDD.collect()
In [ ]:
assert robData[0]==27,'valores incorretos'
print 'ok'
assert assData[0]==152,'valores incorretos'
print 'ok'
In [ ]:
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.boxplot([robData,assData])
plt.grid(b=True, which='major', axis='y')
plt.ylabel('ARRESTED')
plt.xticks([1,2], ['ROBBERY','ASSAULT'])
pass