In [1]:
ListaPalavras = ['gato', 'elefante', 'rato', 'rato', 'gato']
palavrasRDD = sc.parallelize(ListaPalavras, 4)
print type(palavrasRDD)
print palavrasRDD.collect()
map()
para aplicar a transformação em cada palavra do RDD.str.format()
.help()
. Vamos utilizar a padronização de documentação sugerida para o Python, manteremos essa documentação em inglês.
In [2]:
# EXERCICIO
def Plural(palavra):
"""Adds an 's' to `palavra`.
Args:
palavra (str): A string.
Returns:
str: A string with 's' added to it.
"""
return str.format(palavra+'s')
print Plural('gato')
In [3]:
help(Plural)
In [4]:
assert Plural('rato')=='ratos', 'resultado incorreto!'
print 'OK'
In [5]:
# EXERCICIO
pluralRDD = palavrasRDD.map(lambda x: x+'s')
print pluralRDD.collect()
In [6]:
assert pluralRDD.collect()==['gatos','elefantes','ratos','ratos','gatos'], 'valores incorretos!'
print 'OK'
In [7]:
# EXERCICIO
pluralLambdaRDD = palavrasRDD.map(lambda x: x+'s')
print pluralLambdaRDD.collect()
In [8]:
assert pluralLambdaRDD.collect()==['gatos','elefantes','ratos','ratos','gatos'], 'valores incorretos!'
print 'OK'
In [9]:
# EXERCICIO
pluralTamanho = (pluralRDD
.map(lambda x: len(x))
.collect()
)
print pluralTamanho
In [10]:
assert pluralTamanho==[5,9,5,5,5], 'valores incorretos'
print "OK"
1
para cada palavra.(k,v)
é chamada de RDD de tuplas ou pair RDD.map()
com uma função lambda()
.
In [11]:
# EXERCICIO
palavraPar = palavrasRDD.map(lambda x: (x, 1))
print palavraPar.collect()
In [12]:
assert palavraPar.collect() == [('gato',1),('elefante',1),('rato',1),('rato',1),('gato',1)], 'valores incorretos!'
print "OK"
groupByKey()
In [13]:
# EXERCICIO
palavrasGrupo = palavraPar.groupByKey()
for chave, valor in palavrasGrupo.collect():
print '{0}: {1}'.format(chave, list(valor))
print palavrasGrupo.mapValues(lambda x: list(x)).collect()
In [14]:
assert sorted(palavrasGrupo.mapValues(lambda x: list(x)).collect()) ==[('rato', [1, 1]), ('elefante', [1]), ('gato', [1, 1])], 'Valores incorretos!'
print "OK"
In [15]:
# EXERCICIO
contagemGroup = palavrasGrupo.mapValues(lambda x: sum(list(x)))
print contagemGroup.collect()
In [16]:
assert sorted(contagemGroup.collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print "OK"
reduceByKey
reduce()
vista na aula anterior para os valores de cada chave. Dessa forma, a função de transformação pode ser aplicada em cada partição local para depois ser enviada para redistribuição de partições, reduzindo o total de dados sendo movidos e não mantendo listas grandes na memória.
In [17]:
# EXERCICIO
contagem = palavraPar.reduceByKey(lambda a, b : a+b)
print contagem.collect()
In [18]:
assert sorted(contagem.collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print "OK"
In [19]:
# EXERCICIO
contagemFinal = (palavrasRDD
.map(lambda x : (x, 1))
.reduceByKey(lambda a, b: a+b)
)
print contagemFinal.collect()
In [20]:
assert sorted(contagemFinal.collect())==[('rato', 2), ('elefante', 1), ('gato', 2)], 'valores incorretos!'
print "OK"
In [21]:
# EXERCICIO
palavrasUnicas = (palavrasRDD
.map(lambda x : (x, 1))
.reduceByKey(lambda a, b: a+b)
).count()
print palavrasUnicas
In [22]:
assert palavrasUnicas==3, 'valor incorreto!'
print "OK"
print contagemFinal.collect()
contagem
.reduce()
é aplicada em cada tupla do RDD. Para realizar a soma das contagens, primeiro é necessário mapear o RDD para um RDD contendo apenas os valores das frequências (sem as chaves).
In [23]:
# EXERCICIO
# add é equivalente a lambda x,y: x+y
from operator import add
total = (contagemFinal
.map(lambda (x,y) : y)
.reduce(add)
)
media = total / float(palavrasUnicas)
print total
print round(media, 2)
In [24]:
assert round(media, 2)==1.67, 'valores incorretos!'
print "OK"
palavrasRDD.collect()
Out[24]:
contaPalavras
In [25]:
# EXERCICIO
def contaPalavras(chavesRDD):
"""Creates a pair RDD with word counts from an RDD of words.
Args:
chavesRDD (RDD of str): An RDD consisting of words.
Returns:
RDD of (str, int): An RDD consisting of (word, count) tuples.
"""
return (chavesRDD
.map(lambda x: (x, 1))
.reduceByKey(lambda x,y: x+y)
)
print contaPalavras(palavrasRDD).collect()
In [26]:
assert sorted(contaPalavras(palavrasRDD).collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print "OK"
removerPontuacao
que converte todo o texto para minúscula, remove qualquer pontuação e espaços em branco no início ou final da palavra. Para isso, utilize a biblioteca re para remover todo texto que não seja letra, número ou espaço, encadeando com as funções de string para remover espaços em branco e converter para minúscula (veja Strings).
In [27]:
# EXERCICIO
import re
def removerPontuacao(texto):
"""Removes punctuation, changes to lower case, and strips leading and trailing spaces.
Note:
Only spaces, letters, and numbers should be retained. Other characters should should be
eliminated (e.g. it's becomes its). Leading and trailing spaces should be removed after
punctuation is removed.
Args:
texto (str): A string.
Returns:
str: The cleaned up string.
"""
return re.sub(r'[^A-Za-z0-9 ]', '', texto).strip().lower()
print removerPontuacao('Ola, quem esta ai??!')
print removerPontuacao(' Sem espaco e_sublinhado!')
In [28]:
assert removerPontuacao(' O uso de virgulas, embora permitido, nao deve contar. ')=='o uso de virgulas embora permitido nao deve contar', 'string incorreta!'
print "OK"
textFile()
que recebe como entrada o nome do arquivo texto que queremos utilizar e o número de partições.removerPontuacao()
para normalizar o texto e verificar as 15 primeiras linhas com o comando take()
.
In [29]:
# Apenas execute a célula
import os.path
import urllib
url = 'http://www.gutenberg.org/cache/epub/100/pg100.txt' # url do livro
arquivo = os.path.join('Data','Aula02','pg100.txt') # local de destino: 'Data/Aula02/shakespeare.txt'
if os.path.isfile(arquivo): # verifica se já fizemos download do arquivo
print 'Arquivo já existe!'
else:
try:
urllib.urlretrieve(url, arquivo) # salva conteúdo da url em arquivo
except IOError:
print 'Impossível fazer o download: {0}'.format(url)
# lê o arquivo com textFile e aplica a função removerPontuacao
shakesRDD = (sc
.textFile(arquivo, 8)
.map(removerPontuacao)
)
# zipWithIndex gera tuplas (conteudo, indice) onde indice é a posição do conteudo na lista sequencial
# Ex.: sc.parallelize(['gato','cachorro','boi']).zipWithIndex() ==> [('gato',0), ('cachorro',1), ('boi',2)]
# sep.join() junta as strings de uma lista através do separador sep. Ex.: ','.join(['a','b','c']) ==> 'a,b,c'
print '\n'.join(shakesRDD
.zipWithIndex()
.map(lambda (linha, num): '{0}: {1}'.format(num,linha))
.take(15)
)
contaPalavras()
, temos ainda que trabalhar em cima da nossa RDD:map()
para gerar um novo RDD como uma lista de palavras.
In [30]:
# EXERCICIO
shakesPalavrasRDD = shakesRDD.map(lambda x: x.split())
total = shakesPalavrasRDD.count()
print shakesPalavrasRDD.take(5)
print total
map()
gera uma lista para cada linha, criando um RDD contendo uma lista de listas.flatMap()
que aplica a transformação do map()
, porém achatando o retorno em forma de lista para uma lista unidimensional.
In [31]:
# EXERCICIO
shakesPalavrasRDD = shakesRDD.flatMap(lambda x: x.split())
total = shakesPalavrasRDD.count()
print shakesPalavrasRDD.top(5)
print total
In [32]:
assert total==927631 or total == 928908, "valor incorreto de palavras!"
print "OK"
assert shakesPalavrasRDD.top(5)==[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds'],'lista incorreta de palavras'
print "OK"
In [33]:
# EXERCICIO
shakesLimpoRDD = shakesPalavrasRDD.filter(lambda x: len(x) > 0)
total = shakesLimpoRDD.count()
print total
In [34]:
assert total==882996, 'valor incorreto!'
print "OK"
contaPalavras()
.takeOrdered
para imprimir as 15 palavras mais frequentes.takeOrdered()
pode receber um segundo parâmetro que instrui o Spark em como ordenar os elementos. Ex.:takeOrdered(15, key=lambda x: -x)
: ordem decrescente dos valores de x
In [58]:
# EXERCICIO
top15 = contaPalavras(shakesLimpoRDD).takeOrdered(15, key=lambda x: -x[1])
print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15))
In [59]:
print top15
assert top15 == [(u'the', 27361), (u'and', 26028), (u'i', 20681), (u'to', 19150), (u'of', 17463),
(u'a', 14593), (u'you', 13615), (u'my', 12481), (u'in', 10956), (u'that', 10890),
(u'is', 9134), (u'not', 8497), (u'with', 7771), (u'me', 7769), (u'it', 7678)],'valores incorretos!'
print "OK"
In [1]:
import numpy as np
# Vamos criar uma função pNorm que recebe como parâmetro p e retorna uma função que calcula a pNorma
def pNorm(p):
"""Generates a function to calculate the p-Norm between two points.
Args:
p (int): The integer p.
Returns:
Dist: A function that calculates the p-Norm.
"""
def Dist(x,y):
return np.power(np.power(np.abs(x-y),p).sum(),1/float(p))
return Dist
In [2]:
# Vamos criar uma RDD com valores numéricos
numPointsRDD = sc.parallelize(enumerate(np.random.random(size=(10,100))))
In [41]:
# EXERCICIO
# Procure dentre os comandos do PySpark, um que consiga fazer o produto cartesiano da base com ela mesma
cartPointsRDD = numPointsRDD.cartesian(numPointsRDD)
# Aplique um mapa para transformar nossa RDD em uma RDD de tuplas ((id1,id2), (vetor1,vetor2))
# DICA: primeiro utilize o comando take(1) e imprima o resultado para verificar o formato atual da RDD
cartPointsParesRDD = cartPointsRDD.map(lambda vetor: ((vetor[0][0],vetor[1][0]), (vetor[0][1],vetor[1][1])))
#print cartPointsParesRDD.take(1)
# Aplique um mapa para calcular a Distância Euclidiana entre os pares
Euclid = pNorm(2)
distRDD = cartPointsParesRDD.map(lambda vetor: (vetor[0], Euclid(vetor[1][0],vetor[1][1])))
# Encontre a distância máxima, mínima e média, aplicando um mapa que transforma (chave,valor) --> valor
# e utilizando os comandos internos do pyspark para o cálculo da min, max, mean
statRDD = distRDD.map(lambda vetor: vetor[1])
minv, maxv, meanv = statRDD.min(), statRDD.max(), statRDD.mean()
print minv.round(2), maxv.round(2), meanv.round(2)
In [42]:
assert (minv.round(2), maxv.round(2), meanv.round(2))==(0.0, 4.70, 3.65), 'Valores incorretos'
print "OK"
In [83]:
# Vamos criar uma função para calcular a distância de Hamming
def Hamming(x,y):
"""Calculates the Hamming distance between two binary vectors.
Args:
x, y (np.array): Array of binary integers x and y.
Returns:
H (int): The Hamming distance between x and y.
"""
return (x!=y).sum()
# Vamos criar uma função para calcular a distância de Jaccard
def Jaccard(x,y):
"""Calculates the Jaccard distance between two binary vectors.
Args:
x, y (np.array): Array of binary integers x and y.
Returns:
J (int): The Jaccard distance between x and y.
"""
return (x==y).sum()/float( np.maximum(x,y).sum() )
In [92]:
# Vamos criar uma RDD com valores categóricos
catPointsRDD = sc.parallelize(enumerate([['alto', 'caro', 'azul'],
['medio', 'caro', 'verde'],
['alto', 'barato', 'azul'],
['medio', 'caro', 'vermelho'],
['baixo', 'barato', 'verde'],
]))
#print catPointsRDD.collect()
#print catPointsRDD.flatMap(lambda x: map(lambda z: (z, 1), x[1])).collect()
#print catPointsRDD.flatMap(lambda x: map(lambda z: (z, 1), x[1])).reduceByKey(lambda x,z: x).collect()
print catPointsRDD.flatMap(lambda x: map(lambda z: (z, 1), x[1])).reduceByKey(lambda x,z: x).map(lambda x: x[0]).collect()
In [101]:
# EXERCICIO
# Crie um RDD de chaves únicas utilizando flatMap
#print catPointsRDD.collect()
#print catPointsRDD.flatMap(lambda x: map(lambda z: (z, 1), x[1])).collect()
#print catPointsRDD.flatMap(lambda x: map(lambda z: (z, 1), x[1])).reduceByKey(lambda x,z: x).collect()
#print catPointsRDD.flatMap(lambda x: map(lambda z: (z, 1), x[1])).reduceByKey(lambda x,z: x).map(lambda x: x[0]).collect()
chavesRDD = (catPointsRDD
.flatMap(lambda x: map(lambda z: (z, 1), x[1]))
.reduceByKey(lambda x, y: x)
.map(lambda x: x[0])
)
chaves = dict((v,k) for k,v in enumerate(chavesRDD.collect()))
nchaves = len(chaves)
print chaves, nchaves
In [102]:
assert chaves=={'alto': 0, 'medio': 7, 'baixo': 5, 'barato': 2, 'azul': 4, 'verde': 6, 'caro': 3, 'vermelho': 1}, 'valores incorretos!'
print "OK"
assert nchaves==8, 'número de chaves incorreta'
print "OK"
In [105]:
def CreateNP(atributos,chaves):
"""Binarize the categorical vector using a dictionary of keys.
Args:
atributos (list): List of attributes of a given object.
chaves (dict): dictionary with the relation attribute -> index
Returns:
array (np.array): Binary array of attributes.
"""
array = np.zeros(len(chaves))
for atr in atributos:
array[ chaves[atr] ] = 1
return array
# Converte o RDD para o formato binário, utilizando o dict chaves
#print catPointsRDD.collect()
binRDD = catPointsRDD.map(lambda rec: (rec[0],CreateNP(rec[1], chaves)))
binRDD.collect()
Out[105]:
In [ ]:
In [112]:
# EXERCICIO
# Procure dentre os comandos do PySpark, um que consiga fazer o produto cartesiano da base com ela mesma
cartBinRDD = binRDD.cartesian(binRDD)
# Aplique um mapa para transformar nossa RDD em uma RDD de tuplas ((id1,id2), (vetor1,vetor2))
# DICA: primeiro utilize o comando take(1) e imprima o resultado para verificar o formato atual da RDD
cartBinParesRDD = cartBinRDD.map(lambda matrix: ((matrix[0][0], matrix[1][0]), (matrix[0][1], matrix[1][1])))
# Aplique um mapa para calcular a Distância de Hamming e Jaccard entre os pares
hamRDD = cartBinParesRDD.map(lambda matrix: (matrix[0], Hamming(matrix[1][0],matrix[1][1])))
jacRDD = cartBinParesRDD.map(lambda matrix: (matrix[0], Jaccard(matrix[1][0],matrix[1][1])))
# Encontre a distância máxima, mínima e média, aplicando um mapa que transforma (chave,valor) --> valor
# e utilizando os comandos internos do pyspark para o cálculo da min, max, mean
statHRDD = hamRDD.map(lambda matrix: matrix[1])
statJRDD = jacRDD.map(lambda matrix: matrix[1])
Hmin, Hmax, Hmean = statHRDD.min(), statHRDD.max(), statHRDD.mean()
Jmin, Jmax, Jmean = statJRDD.min(), statJRDD.max(), statJRDD.mean()
print "\t\tMin\tMax\tMean"
print "Hamming:\t{:.2f}\t{:.2f}\t{:.2f}".format(Hmin, Hmax, Hmean )
print "Jaccard:\t{:.2f}\t{:.2f}\t{:.2f}".format( Jmin, Jmax, Jmean )
In [113]:
assert (Hmin.round(2), Hmax.round(2), Hmean.round(2)) == (0.00,6.00,3.52), 'valores incorretos'
print "OK"
assert (Jmin.round(2), Jmax.round(2), Jmean.round(2)) == (0.33,2.67,1.14), 'valores incorretos'
print "OK"
In [ ]: