header e imprimi-la para a descrição dos campos de nossa base.
In [1]:
import os
import numpy as np
from pyspark import SparkContext
sc = SparkContext()
filename = os.path.join("Data","Aula03","Crime.csv")
CrimeRDD = sc.textFile(filename,8)
header = CrimeRDD.take(1)[0] # o cabeçalho é a primeira linha do arquivo
print "Campos disponíveis: {}".format(header)
In [2]:
# EXERCICIO
CrimeHeadlessRDD = CrimeRDD.filter(lambda x:x!=header)#<COMPLETAR>
firstObject = CrimeHeadlessRDD.take(1)[0]
print firstObject
In [3]:
assert firstObject==u'2015-05-13 23:53:00,WARRANTS,WARRANT ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",OAK ST / LAGUNA ST,-122.425891675136,37.7745985956747', 'valor incorreto'
print "OK"
In [4]:
# EXERCICIO
CrimeHeadlessRDD = (CrimeRDD.filter(lambda x: x!=header).map(lambda y: y.split(",")))
#.map(lambda x: x!= header)#<COMPLETAR>
#.map(lambda y: y.split()#<COMPLETAR>
#)
firstObjectList = CrimeHeadlessRDD.take(1)[0]
print firstObjectList
In [5]:
assert firstObjectList[0]==u'2015-05-13 23:53:00', 'valores incorretos'
print "OK"
split().re. Vamos utilizar o comando re.split() para cuidar da separação de nossa base em campos.datetime através do comando datetime.datetime.strptime(). Também vamos agrupar as coordenadas X e Y em uma tupla de floats.namedtuple que permite acessar cada campo de cada objeto pelo nome. Ex.: rec.Dates.
In [6]:
# EXERCICIO
import re
import datetime
from collections import namedtuple
headeritems = header.split(',') # transformar o cabeçalho em lista
del headeritems[-1] # apagar o último item e...
headeritems[-1] = 'COORD' # transformar em COORD
# Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,COORD
Crime = namedtuple('Crime',headeritems) # gera a namedtuple Crime com os campos de header
REGEX = r',(?=(?:[^"]*"[^"]*")*(?![^"]*"))'
# buscar por "," tal que após essa vírgula (?=) ou exista um par de "" ou não tenha " sozinha
# ?= indica para procurarmos pelo padrão após a vírgula
# ?: significa para não interpretar os parênteses como captura de valores
# [^"]* 0 ou sequências de caracteres que não sejam aspas
# [^"]*"[^"]*" <qualquer caracter exceto aspas> " <qualquer caracter exceto aspas> "
# ?! indica para verificar se não existe tal padrão a frente da vírgula
def ParseCrime(rec):
# utilizando re.split() vamos capturar nossos valores
Date, Category, Descript, DayOfWeek, PdDistrict, Resolution, Address, X, Y = re.split(REGEX,rec)#<COMPLETAR>
# Converta a data para o formato datetime
Date = datetime.datetime.strptime(Date, "%Y-%m-%d %H:%M:%S")
# COORD é uma tupla com floats representando X e Y
COORD = (X,Y)#<COMPLETAR>
# O campos 'Resolution' será uma lista dos valores separados por vírgula, sem as aspas
Resolution = Resolution.split(",")#<COMPLETAR>
return Crime(Date, Category, Descript, DayOfWeek, PdDistrict, Resolution, Address, COORD)
# Aplique a função ParseCrime para cada objeto da base
#CrimeHeadlessRDD = (CrimeRDD.map(ParseCrime)
# .<COMPLETAR>
# .<COMPLETAR>
# )
CrimeHeadlessRDD = CrimeRDD.filter(lambda x: x!=header).map(ParseCrime)
firstClean = CrimeHeadlessRDD.take(1)[0]
totalRecs = CrimeHeadlessRDD.count()
print firstClean
In [8]:
assert type(firstClean.Dates) is datetime.datetime and type(firstClean.Resolution) is list and type(firstClean.COORD) is tuple,'tipos incorretos'
print "OK"
assert CrimeHeadlessRDD.filter(lambda x: len(x)!=8).count()==0, 'algo deu errado!'
print "OK"
assert totalRecs==878049, 'total de registros incorreto'
print "OK"
In [9]:
# EXERCICIO
from operator import add
CatCountRDD = CrimeHeadlessRDD.map(lambda x:(x.Category,1)).reduceByKey(add)
#.<COMPLETAR>
#.<COMPLETAR>
#)
#contagemFinal = (palavrasRDD.map(lambda x:(x,1)).reduceByKey(add)
catCount = sorted(CatCountRDD.collect(), key=lambda x: -x[1])
print catCount
In [10]:
assert catCount[0][1]==174900, 'valores incorretos'
print "OK"
In [11]:
# EXERCICIO
RegionCountRDD = CrimeHeadlessRDD.map(lambda x:(x.PdDistrict,1)).reduceByKey(add)
# .<COMPLETAR>
# .<COMPLETAR>
#)
regCount = sorted(RegionCountRDD.collect(), key=lambda x: -x[1])
print regCount
In [12]:
assert regCount[0][1]==157182, 'valores incorretos'
print "OK"
contagemDiaDaSemana para float. Finalmente, o resultado pode ser agrupado pela chave, gerando uma tupla ( DayOfWeek, [ (Pd1, media1), (Pd2, media2), ... ] ). Essa lista pode ser mapeada para um dicionário com o comando dict.collectAsMap().
In [70]:
# EXERCICIO
from operator import add
# Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,COORD
# Lambda para converter um datetime em `Dia-Mes-Ano`
day2str = lambda x: '{}-{}-{}'.format(x.day,x.month,x.year)
totalDatesRDD = CrimeHeadlessRDD.map(lambda x : ((day2str(x.Dates),x.DayOfWeek),1)).reduceByKey(lambda y,z:y).map(lambda a: (a[0][1],1)).reduceByKey(add)
#.<COMPLETAR>
#.<COMPLETAR>
#.<COMPLETAR>
#.<COMPLETAR>
#)
crimesWeekDayRegionRDD = CrimeHeadlessRDD.map(lambda x:((x.DayOfWeek,x.PdDistrict),1)).reduceByKey(add).map(lambda (y,z):(y[0],(y[1],z)))
# .<COMPLETAR>
# .<COMPLETAR>
# .<COMPLETAR>
# )
RegionAvgPerDayRDD = (crimesWeekDayRegionRDD
.join(totalDatesRDD)
.map(lambda ( dow, ( (pd,c), cds ) ):( dow, (pd, c / float(cds))))#<COMPLETAR>
.groupByKey()#<COMPLETAR>
.map(lambda x:(x[0],dict(x[1])))#<COMPLETAR>
)
RegionAvg = RegionAvgPerDayRDD.collectAsMap()
print RegionAvg['Sunday']
In [71]:
assert np.round(RegionAvg['Sunday']['BAYVIEW'],2)==37.27, 'valores incorretos {}'.format(np.round(RegionAvg[0][2],2))
print "OK"
Keys. Itere essa variável realizando os seguintes passos:key[0] não existir no dicionário, crie a entrada key[0] como um dicionário vazio.key e gerando a RDD com os valores da tupla. Note que não queremos uma lista de listas.mean() e stdev() do PySpark, armazenando na chave RegionAvgSpark[ key[0] ][ key[1] ].
In [92]:
# EXERCICIO
countWeekDayDistRDD = (CrimeHeadlessRDD.map(lambda x:((day2str(x.Dates),x.DayOfWeek,x.PdDistrict),1))
.reduceByKey(add)
.map(lambda ((d,dow,pd),c):((dow,pd),c))#<COMPLETAR>
.groupByKey()#<COMPLETAR>
#.<COMPLETAR>
#.<COMPLETAR>
)
# Esse procedimento só é viável se existirem poucas chaves
RegionAvgSpark = {}
Keys = countWeekDayDistRDD.map(lambda rec: rec[0]).collect()
for key in Keys:
listRDD = (countWeekDayDistRDD
.filter(lambda rec: rec[0]==key)
.flatMap(lambda rec: rec[1])
)
if key[0] not in RegionAvgSpark:
RegionAvgSpark[key[0]] = {}
RegionAvgSpark[key[0]][key[1]] = (listRDD.mean(), listRDD.stdev())
print RegionAvgSpark['Sunday']
In [93]:
assert np.round(RegionAvgSpark['Sunday']['BAYVIEW'][0],2)==37.39 and np.round(RegionAvgSpark['Sunday']['BAYVIEW'][1],2)==10.06, 'valores incorretos'
print "OK"
matplotlib que já vem por padrão na maioria das distribuições do Python (ex.: Anaconda). Outras bibliotecas alternativas interessantes são: Seaborn e Bokeh.count() armazenando na variável totalDays. Não se esqueça de converter o valor para float.zip() do Python é possível descompactar um dicionário em duas variáveis, uma com as chaves e outra com os valores. Utilizaremos essas variáveis para a plotagem do gráfico.
In [23]:
%matplotlib inline
import matplotlib.pyplot as plt
# Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,COORD
# Lambda para converter um datetime em `Dia-Mes-Ano`
day2str = lambda x: '{}-{}-{}'.format(x.day,x.month,x.year)
totalDatesRDD = (CrimeHeadlessRDD
.map(lambda rec: (day2str(rec.Dates),1))
.reduceByKey(lambda x,y: x)
)
totalDays = float(totalDatesRDD.count())
avgCrimesRegionRDD = (RegionCountRDD
.map(lambda rec: (rec[0],rec[1]/totalDays))
)
Xticks,Y = zip(*avgCrimesRegionRDD.collectAsMap().items())
indices = np.arange(len(Xticks))
width = 0.35
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.bar(indices,Y, width)
plt.grid(b=True, which='major', axis='y')
plt.xticks(indices+width/2., Xticks, rotation=17 )
plt.ylabel('Number of crimes')
plt.xlabel('Region')
pass
Y em que a chave é o dia da semana e o valor é uma np.array contendo a média de cada região para aquele dia.Bottom que determina qual é o início de cada uma das barras. O início da barra do dia i deve ser o final da barra do dia i-1.Bottom daquele dia.
In [76]:
# Dias da semana como referência
Day = ['Sunday','Monday','Tuesday','Wednesday','Thursday','Friday','Saturday']
# Uma cor para cada dia
Color = ['r','b','g','y','c','k','purple']
# Dicionário (dia, array de médias)
Y = {}
for day in Day:
Y[day] = np.array([RegionAvg[day][x] for x in Xticks])
# Matriz dias x regiões
Bottom = np.zeros( (len(Day),len(Xticks)) )
for i in range(1,len(Day)):
Bottom[i,:] = Bottom[i-1,:]+Y[Day[i-1]]
indices = np.arange(len(Xticks))
width = 0.35
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
# Gera uma lista de plots, um para cada dia
plots = [plt.bar(indices,Y[Day[i]], width, color=Color[i], bottom=Bottom[i]) for i in range(len(Day))]
plt.legend( [p[0] for p in plots], Day,loc='center left', bbox_to_anchor=(1, 0.5) )
plt.grid(b=True, which='major', axis='y')
plt.xticks(indices+width/2., Xticks, rotation=17 )
plt.ylabel('Number of crimes')
plt.xlabel('Region')
pass
In [91]:
# EXERCICIO
parseWeekday = lambda x: '{}-{}-{}'.format(x.day, x.month, x.year)
hoursRDD = (CrimeHeadlessRDD
.map(lambda x: ((x.Dates.hour,parseWeekday (x.Dates)),1))#<COMPLETAR>
.reduceByKey(add)#<COMPLETAR>
#.<COMPLETAR>
#.<COMPLETAR>
)
crimePerHourRDD = (CrimeHeadlessRDD
# .map(lambda x: (x.Dates.hour)<COMPLETAR>
# .<COMPLETAR>
# )
#avgCrimeHourRDD = (crimePerHourRDD
# .<COMPLETAR>
# .<COMPLETAR>
# )
#crimePerHour = avgCrimeHourRDD.collect()
#print crimePerHour[0:5]
In [ ]:
assert np.round(crimePerHour[0][1],2)==19.96, 'valores incorretos'
print "OK"
In [ ]:
crimePerHourSort = sorted(crimePerHour,key=lambda x: x[0])
X,Y = zip(*crimePerHourSort)
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.plot(X,Y)
plt.grid(b=True, which='major', axis='y')
plt.ylabel('Avg. Number of crimes')
plt.xlabel('Hour')
pass
dict() nessa lista para obtermos uma RDD no seguinte formato: (Mes-Ano, {CRIME: quantidade}).crimes contendo a lista de crimes contidas na lista de pares catCount computada anteriormente.
In [104]:
# EXERCICIO
parseMonthYear = lambda x: '{}-{}'.format(x.month, x.year)
crimes = map(lambda x: x[0], catCount)
datesCrimesRDD = (CrimeHeadlessRDD
.map(lambda x: ((parseMonthYear(x.Dates),x.Category),1))#<COMPLETAR>
.reduceByKey(add)#<COMPLETAR>
.map(lambda ((ma,crime),contagem):(ma,(crime,contagem)))#<COMPLETAR>
.groupByKey()#<COMPLETAR>
.map(lambda y:(y[0],dict(y[1])))#<COMPLETAR>
)
#print datesCrimesRDD.collect()
print datesCrimesRDD.take(1)
In [105]:
assert datesCrimesRDD.take(1)[0][1][u'KIDNAPPING']==12,'valores incorretos'
print 'ok'
fractionCrimesDateRDD em que a chave é Mes-Ano e o valor é uma lista da fração de cada tipo de crime ocorridos naquele mês e ano. Para gerar essa lista vamos utilizar o list comprehension do Python de tal forma a calcular a fração para cada crime na variável crimes.get() que permite atribuir um valor padrão caso a chave não exista. Ex.: dicionario.get( chave, 0.0) retornará 0.0 caso a chave não exista.
In [133]:
# EXERCICIO
totalPerDateRDD = (CrimeHeadlessRDD
.map(lambda x:(parseMonthYear(x.Dates),1))#<COMPLETAR>
.reduceByKey(add)#<COMPLETAR>
)
fractionCrimesDateRDD = (datesCrimesRDD
.map(lambda x:(x[0],[i for i in crimes if i == x[1]]))#<COMPLETAR>
.reduceByKey(add)#<COMPLETAR>
.cache()
)
print fractionCrimesDateRDD.take(1)
In [134]:
assert np.abs(fractionCrimesDateRDD.take(1)[0][1][0][1]-0.163950)<1e-6,'valores incorretos'
print 'ok'
Statistics.corr() da biblioteca pyspark.mlllib.stat.
In [ ]:
from pyspark.mllib.stat import Statistics
corr = Statistics.corr(fractionCrimesDateRDD.map(lambda rec: map(lambda x: x[1],rec[1])))
print corr
In [ ]:
npCorr = np.array(corr)
rowMin = npCorr.min(axis=1).argmin()
colMin = npCorr[rowMin,:].argmin()
print crimes[rowMin], crimes[colMin], npCorr[rowMin,colMin]
npCorr[npCorr==1.] = 0.
rowMax = npCorr.max(axis=1).argmax()
colMax = npCorr[rowMax,:].argmax()
print crimes[rowMax], crimes[colMax], npCorr[rowMax,colMax]
var1RDD e var2RDD. Elas são um mapeamento da fractionCrimesDateRDD filtradas para conter apenas o crime contido em Xlabel e Ylabel, respectivamente.correlationRDD que mapeará para tuplas de valores, onde os valores são as médias calculadas em fractionCrimesDateRDD.
In [ ]:
# EXERCICIO
Xlabel = 'FORGERY/COUNTERFEITING'#'DRIVING UNDER THE INFLUENCE'
Ylabel = 'NON-CRIMINAL'#'LIQUOR LAWS'
var1RDD = (fractionCrimesDateRDD
.map(lambda rec: (rec[0], filter(lambda x: x[0]==Xlabel,rec[1])[0][1]))
)
var2RDD = (fractionCrimesDateRDD
.map(lambda rec: (rec[0], filter(lambda x: x[0]==Ylabel,rec[1])[0][1]))
)
correlationRDD = (var1RDD
.<COMPLETAR>
.<COMPLETAR>
)
Data = correlationRDD.collect()
print Data[0]
In [ ]:
assert np.abs(Data[0][0]-0.015904)<1e-6, 'valores incorretos'
print 'ok'
In [ ]:
X,Y = zip(*Data)
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.scatter(X,Y)
plt.grid(b=True, which='major', axis='y')
plt.xlabel(Xlabel)
plt.ylabel(Ylabel)
pass
bookedRDD que contém apenas os registros contendo ARREST no campo Resolution (lembre-se que esse campo é uma lista) e contabilizar a quantidade de registros em cada 'Mes-Ano'. Ao final, vamos mapear para uma RDD contendo apenas os valores contabilizados.
In [120]:
# EXERCICIO
bookedRDD = (CrimeHeadlessRDD
.filter(lambda x: u'"ARREST' in x.Resolution)#<COMPLETAR>
.map(lambda y:(parseMonthYear(y.Dates),1))#<COMPLETAR>
.reduceByKey(add)#<COMPLETAR>
.map(lambda z:z[1])#.<COMPLETAR>
)
Data = bookedRDD.collect()
#print Data
print Data[:5]
In [121]:
assert Data[0]==1914,'valores incorretos'
print 'ok'
In [122]:
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.hist(Data)
plt.grid(b=True, which='major', axis='y')
plt.xlabel('ARRESTED')
pass
In [123]:
# EXERCICIO
parseDayMonth = lambda x: '{}-{}'.format(x.month,x.year)
bookedRDD = (CrimeHeadlessRDD
.filter(lambda x: u'"ARREST' in x.Resolution)#<COMPLETAR>
.map(lambda y:(parseMonthYear(y.Dates),1))#<COMPLETAR>
.reduceByKey(add)#<COMPLETAR>
.map(lambda z:z[1])#.<COMPLETAR>
)
robberyBookedRDD = (CrimeHeadlessRDD
.filter(lambda x: u'"ARREST' in x.Resolution and x.Category == "ROBBERY")#<COMPLETAR>
.map(lambda y:(parseMonthYear(y.Dates),1))#<COMPLETAR>
.reduceByKey(add)#<COMPLETAR>
.map(lambda z:z[1])#.<COMPLETAR>
)
assaultBookedRDD = (CrimeHeadlessRDD
.filter(lambda x: u'"ARREST' in x.Resolution and x.Category == "ASSAULT")#<COMPLETAR>
.map(lambda y:(parseMonthYear(y.Dates),1))#<COMPLETAR>
.reduceByKey(add)#<COMPLETAR>
.map(lambda z:z[1])#.<COMPLETAR>
)
robData = robberyBookedRDD.collect()
assData = assaultBookedRDD.collect()
In [124]:
assert robData[0]==27,'valores incorretos'
print 'ok'
assert assData[0]==152,'valores incorretos'
print 'ok'
In [125]:
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.boxplot([robData,assData])
plt.grid(b=True, which='major', axis='y')
plt.ylabel('ARRESTED')
plt.xticks([1,2], ['ROBBERY','ASSAULT'])
pass
In [ ]: