Spark + Python = PySpark

Esse notebook introduz os conceitos básicos do Spark através de sua interface com a linguagem Python. Como aplicação inicial faremos o clássico examplo de contador de palavras . Com esse exemplo é possível entender a lógica de programação funcional para as diversas tarefas de exploração de dados distribuídos.

Para isso utilizaremos o livro texto Trabalhos completos de William Shakespeare obtidos do Projeto Gutenberg. Veremos que esse mesmo algoritmo pode ser empregado em textos de qualquer tamanho.

Esse notebook contém:

Parte 1: Criando uma base RDD e RDDs de tuplas

Parte 2: Manipulando RDDs de tuplas

Parte 3: Encontrando palavras únicas e calculando médias

Parte 4: Aplicar contagem de palavras em um arquivo

Parte 5: Similaridade entre Objetos

Para os exercícios é aconselhável consultar a documentação da API do PySpark

Part 1: Criando e Manipulando RDDs

Nessa parte do notebook vamos criar uma base RDD a partir de uma lista com o comando parallelize.

(1a) Criando uma base RDD

Podemos criar uma base RDD de diversos tipos e fonte do Python com o comando sc.parallelize(fonte, particoes), sendo fonte uma variável contendo os dados (ex.: uma lista) e particoes o número de partições para trabalhar em paralelo.


In [1]:
from pyspark import SparkContext
sc =SparkContext()
ListaPalavras = ['gato', 'elefante', 'rato', 'rato', 'gato']
palavrasRDD = sc.parallelize(ListaPalavras, 4)
print type(palavrasRDD)


<class 'pyspark.rdd.RDD'>

(1b) Plural

Vamos criar uma função que transforma uma palavra no plural adicionando uma letra 's' ao final da string. Em seguida vamos utilizar a função map() para aplicar a transformação em cada palavra do RDD.

Em Python (e muitas outras linguagens) a concatenação de strings é custosa. Uma alternativa melhor é criar uma nova string utilizando str.format().

Nota: a string entre os conjuntos de três aspas representa a documentação da função. Essa documentação é exibida com o comando help(). Vamos utilizar a padronização de documentação sugerida para o Python, manteremos essa documentação em inglês.


In [2]:
# EXERCICIO
def Plural(palavra):
    """Adds an 's' to `palavra`.

    Args:
        palavra (str): A string.

    Returns:
        str: A string with 's' added to it.
    """
    return "{0}{1}".format(palavra,"s")#<COMPLETAR>

print Plural('gato')


gatos

In [3]:
help(Plural)


Help on function Plural in module __main__:

Plural(palavra)
    Adds an 's' to `palavra`.
    
    Args:
        palavra (str): A string.
    
    Returns:
        str: A string with 's' added to it.


In [4]:
assert Plural('rato')=='ratos', 'resultado incorreto!'
print 'OK'


OK

(1c) Aplicando a função ao RDD

Transforme cada palavra do nosso RDD em plural usando map()

Em seguida, utilizaremos o comando collect() que retorna a RDD como uma lista do Python.


In [5]:
# EXERCICIO
pluralRDD = palavrasRDD.map(Plural)#<COMPLETAR>
print pluralRDD.collect()


['gatos', 'elefantes', 'ratos', 'ratos', 'gatos']

In [6]:
assert pluralRDD.collect()==['gatos','elefantes','ratos','ratos','gatos'], 'valores incorretos!'
print 'OK'


OK

Nota: utilize o comando collect() apenas quando tiver certeza de que a lista caberá na memória. Para gravar os resultados de volta em arquivo texto ou base de dados utilizaremos outro comando.

(1d) Utilizando uma função lambda

Repita a criação de um RDD de plurais, porém utilizando uma função lambda.


In [8]:
# EXERCICIO
pluralLambdaRDD = palavrasRDD.map(lambda x: "{0}{1}".format(x,"s"))#<COMPLETAR>
print pluralLambdaRDD.collect()


['gatos', 'elefantes', 'ratos', 'ratos', 'gatos']

In [9]:
assert pluralLambdaRDD.collect()==['gatos','elefantes','ratos','ratos','gatos'], 'valores incorretos!'
print 'OK'


OK

(1e) Tamanho de cada palavra

Agora use map() e uma função lambda para retornar o número de caracteres em cada palavra. Utilize collect() para armazenar o resultado em forma de listas na variável destino.


In [12]:
# EXERCICIO
pluralTamanho = (pluralRDD.map(lambda x: len(x))
                 #<COMPLETAR>
                 ).collect()
print pluralTamanho


[5, 9, 5, 5, 5]

In [13]:
assert pluralTamanho==[5,9,5,5,5], 'valores incorretos'
print "OK"


OK

(1f) RDDs de pares e tuplas

Para contar a frequência de cada palavra de maneira distribuída, primeiro devemos atribuir um valor para cada palavra do RDD. Isso irá gerar um base de dados (chave, valor). Desse modo podemos agrupar a base através da chave, calculando a soma dos valores atribuídos. No nosso caso, vamos atribuir o valor 1 para cada palavra.

Um RDD contendo a estrutura de tupla chave-valor (k,v) é chamada de RDD de tuplas ou pair RDD.

Vamos criar nosso RDD de pares usando a transformação map() com uma função lambda().


In [14]:
# EXERCICIO
palavraPar = palavrasRDD.map(lambda x: (x,1))#<COMPLETAR>
print palavraPar.collect()


[('gato', 1), ('elefante', 1), ('rato', 1), ('rato', 1), ('gato', 1)]

In [15]:
assert palavraPar.collect() == [('gato',1),('elefante',1),('rato',1),('rato',1),('gato',1)], 'valores incorretos!'
print "OK"


OK

Parte 2: Manipulando RDD de tuplas

Vamos manipular nossa RDD para contar as palavras do texto.

(2a) Função groupByKey()

A função groupByKey() agrupa todos os valores de um RDD através da chave (primeiro elemento da tupla) agregando os valores em uma lista.

Essa abordagem tem um ponto fraco pois:

  • #### A operação requer que os dados distribuídos sejam movidos em massa para que permaneçam na partição correta.
  • #### As listas podem se tornar muito grandes. Imagine contar todas as palavras do Wikipedia: termos comuns como "a", "e" formarão uma lista enorme de valores que pode não caber na memória do processo escravo.

In [16]:
# EXERCICIO
palavrasGrupo = palavraPar.groupByKey()
for chave, valor in palavrasGrupo.collect():
    print '{0}: {1}'.format(chave, list(valor))


rato: [1, 1]
elefante: [1]
gato: [1, 1]

In [19]:
assert sorted(palavrasGrupo.mapValues(lambda x: list(x)).collect()) == [('elefante', [1]), ('gato',[1, 1]), ('rato',[1, 1])],        'Valores incorretos!'
print "OK"


OK

(2b) Calculando as contagens

Após o groupByKey() nossa RDD contém elementos compostos da palavra, como chave, e um iterador contendo todos os valores correspondentes aquela chave.

Utilizando a transformação mapValues() e a função sum(), contrua um novo RDD que consiste de tuplas (chave, soma).


In [21]:
# EXERCICIO
contagemGroup = palavrasGrupo.mapValues(lambda x: sum (x))#<COMPLETAR>
print contagemGroup.collect()


[('rato', 2), ('elefante', 1), ('gato', 2)]

In [22]:
assert list(sorted(contagemGroup.collect()))==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print "OK"


OK

(2c) reduceByKey

Um comando mais interessante para a contagem é o reduceByKey() que cria uma nova RDD de tuplas.

Essa transformação aplica a transformação reduce() vista na aula anterior para os valores de cada chave. Dessa forma, a função de transformação pode ser aplicada em cada partição local para depois ser enviada para redistribuição de partições, reduzindo o total de dados sendo movidos e não mantendo listas grandes na memória.


In [26]:
# EXERCICIO
from operator import add
contagem = palavraPar.reduceByKey(add)#<COMPLETAR>
print contagem.collect()


[('rato', 2), ('elefante', 1), ('gato', 2)]

In [27]:
assert sorted(contagem.collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print "OK"


OK

(2d) Agrupando os comandos

A forma mais usual de realizar essa tarefa, partindo do nosso RDD palavrasRDD, é encadear os comandos map e reduceByKey em uma linha de comando.


In [34]:
# EXERCICIO
contagemFinal = (palavrasRDD.map(lambda x:(x,1)).reduceByKey(add)
                 #<COMPLETAR>
                 #<COMPLETAR>
                 )
contagemFinal = contagemFinal.collect()
print contagemFinal


[('rato', 2), ('elefante', 1), ('gato', 2)]

In [35]:
assert sorted(contagemFinal)==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print "OK"


OK

Parte 3: Encontrando as palavras únicas e calculando a média de contagem

(3a) Palavras Únicas

Calcule a quantidade de palavras únicas do RDD. Utilize comandos de RDD da API do PySpark e alguma das últimas RDDs geradas nos exercícios anteriores.


In [69]:
# EXERCICIO
palavrasUnicas = (palavrasRDD.map(lambda x:(x,1)).reduceByKey(lambda y,z:1)).collect()#<COMPLETAR>
palavrasUnicas = len(palavrasUnicas)
print palavrasUnicas


3

In [70]:
assert palavrasUnicas==3, 'valor incorreto!'
print "OK"


OK

(3b) Calculando a Média de contagem de palavras

Encontre a média de frequência das palavras utilizando o RDD contagem.

Note que a função do comando reduce() é aplicada em cada tupla do RDD. Para realizar a soma das contagens, primeiro é necessário mapear o RDD para um RDD contendo apenas os valores das frequências (sem as chaves).


In [90]:
# EXERCICIO
# add é equivalente a lambda x,y: x+y
palavrasRDD2 = sc.parallelize(contagemFinal)
#print palavrasRDD2.collect()
total = (palavrasRDD2.map(lambda x:(x[1])).reduce(add))
media = total / float(palavrasUnicas)
print total
print round(media, 2)


5
1.67

In [91]:
assert round(media, 2)==1.67, 'valores incorretos!'
print "OK"


OK

Parte 4: Aplicar nosso algoritmo em um arquivo

(4a) Função contaPalavras

Para podermos aplicar nosso algoritmo genéricamente em diversos RDDs, vamos primeiro criar uma função para aplicá-lo em qualquer fonte de dados. Essa função recebe de entrada um RDD contendo uma lista de chaves (palavras) e retorna um RDD de tuplas com as chaves e a contagem delas nessa RDD


In [94]:
# EXERCICIO
def contaPalavras(chavesRDD):
    """Creates a pair RDD with word counts from an RDD of words.

    Args:
        chavesRDD (RDD of str): An RDD consisting of words.

    Returns:
        RDD of (str, int): An RDD consisting of (word, count) tuples.
    """
    return (chavesRDD.map(lambda x: (x,1)).reduceByKey(add)
            #<COMPLETAR>
            #<COMPLETAR>
           )

print contaPalavras(palavrasRDD).collect()


[('rato', 2), ('elefante', 1), ('gato', 2)]

In [95]:
assert sorted(contaPalavras(palavrasRDD).collect())==[('elefante',1), ('gato',2), ('rato',2)], 'valores incorretos!'
print "OK"


OK

(4b) Normalizando o texto

Quando trabalhamos com dados reais, geralmente precisamos padronizar os atributos de tal forma que diferenças sutis por conta de erro de medição ou diferença de normatização, sejam desconsideradas. Para o próximo passo vamos padronizar o texto para:

  • #### Padronizar a capitalização das palavras (tudo maiúsculo ou tudo minúsculo).
  • #### Remover pontuação.
  • #### Remover espaços no início e no final da palavra.

Crie uma função removerPontuacao que converte todo o texto para minúscula, remove qualquer pontuação e espaços em branco no início ou final da palavra. Para isso, utilize a biblioteca re para remover todo texto que não seja letra, número ou espaço, encadeando com as funções de string para remover espaços em branco e converter para minúscula (veja Strings).


In [96]:
# EXERCICIO
import re
def removerPontuacao(texto):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        texto (str): A string.

    Returns:
        str: The cleaned up string.
    """
    return re.sub(r'[^A-Za-z0-9 ]', '', texto).strip().lower()
print removerPontuacao('Ola, quem esta ai??!')
print removerPontuacao(' Sem espaco e_sublinhado!')


ola quem esta ai
sem espaco esublinhado

In [97]:
assert removerPontuacao(' O uso de virgulas, embora permitido, nao deve contar. ')=='o uso de virgulas embora permitido nao deve contar', 'string incorreta!'
print "OK"


OK

(4c) Carregando arquivo texto

Para a próxima parte vamos utilizar o livro Trabalhos completos de William Shakespeare do Projeto Gutenberg.

Para converter um texto em uma RDD, utilizamos a função textFile() que recebe como entrada o nome do arquivo texto que queremos utilizar e o número de partições.

O nome do arquivo texto pode se referir a um arquivo local ou uma URI de arquivo distribuído (ex.: hdfs://).

Vamos também aplicar a função removerPontuacao() para normalizar o texto e verificar as 15 primeiras linhas com o comando take().


In [108]:
# Apenas execute a célula
import os.path
import urllib2

url = 'http://www.gutenberg.org/cache/epub/100/pg100.txt' # url do livro

arquivo = os.path.join('Data','Aula02','shakespeare.txt') # local de destino: 'Data/Aula02/shakespeare.txt'

if os.path.isfile(arquivo):     # verifica se já fizemos download do arquivo
    print 'Arquivo já existe!'
else:
    try:        
        response = urllib2.urlopen(url)
        arquivo = (response.read()).split() #ja gera uma lista de palavras
    except IOError:
        print 'Impossível fazer o download: {0}'.format(url)

# lê o arquivo com textFile e aplica a função removerPontuacao        
shakesRDD = (sc.textFile(arquivo).map(removerPontuacao))

# zipWithIndex gera tuplas (conteudo, indice) onde indice é a posição do conteudo na lista sequencial
# Ex.: sc.parallelize(['gato','cachorro','boi']).zipWithIndex() ==> [('gato',0), ('cachorro',1), ('boi',2)]
# sep.join() junta as strings de uma lista através do separador sep. Ex.: ','.join(['a','b','c']) ==> 'a,b,c'
print '\n'.join(shakesRDD
                .zipWithIndex()
                .map(lambda (linha, num): '{0}: {1}'.format(num,linha))
                .take(15)
               )


Arquivo já existe!
0: the project gutenberg ebook of the complete works of william shakespeare by
1: william shakespeare
2: 
3: this ebook is for the use of anyone anywhere at no cost and with
4: almost no restrictions whatsoever  you may copy it give it away or
5: reuse it under the terms of the project gutenberg license included
6: with this ebook or online at wwwgutenbergorg
7: 
8: this is a copyrighted project gutenberg ebook details below
9: please follow the copyright guidelines in this file
10: 
11: title the complete works of william shakespeare
12: 
13: author william shakespeare
14: 

(4d) Extraindo as palavras

Antes de poder usar nossa função Before we can use the contaPalavras(), temos ainda que trabalhar em cima da nossa RDD:

  • #### Precisamos gerar listas de palavras ao invés de listas de sentenças.
  • #### Eliminar linhas vazias.

As strings em Python tem o método split() que faz a separação de uma string por separador. No nosso caso, queremos separar as strings por espaço.

Utilize a função map() para gerar um novo RDD como uma lista de palavras.


In [119]:
# EXERCICIO
shakesPalavrasRDD = shakesRDD.map(lambda x: x.split())#<COMPLETAR>
total = shakesPalavrasRDD.count()
print shakesPalavrasRDD.take(5)
print total


[[u'the', u'project', u'gutenberg', u'ebook', u'of', u'the', u'complete', u'works', u'of', u'william', u'shakespeare', u'by'], [u'william', u'shakespeare'], [], [u'this', u'ebook', u'is', u'for', u'the', u'use', u'of', u'anyone', u'anywhere', u'at', u'no', u'cost', u'and', u'with'], [u'almost', u'no', u'restrictions', u'whatsoever', u'you', u'may', u'copy', u'it', u'give', u'it', u'away', u'or']]
124787

Conforme deve ter percebido, o uso da função map() gera uma lista para cada linha, criando um RDD contendo uma lista de listas.

Para resolver esse problema, o Spark possui uma função análoga chamada flatMap() que aplica a transformação do map(), porém achatando o retorno em forma de lista para uma lista unidimensional.


In [120]:
# EXERCICIO
shakesPalavrasRDD = shakesRDD.flatMap(lambda x: x.split())
total = shakesPalavrasRDD.count()
print shakesPalavrasRDD.top(5)
print total


[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds']
903705

Nota: os asserts abaixo de contagem de palavra podem falhar por diferença de formato do arquivo .txt antigo e novo. Eu avaliarei somente os códigos nesse trecho.


In [122]:
#assert total==927631 or total == 928908, "valor incorreto de palavras!"
#print "OK"
assert shakesPalavrasRDD.top(5)==[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds'],'lista incorreta de palavras'
print "OK"


OK

(4e) Remover linhas vazias

Para o próximo passo vamos filtrar as linhas vazias com o comando filter(). Uma linha vazia é uma string sem nenhum conteúdo.


In [129]:
# EXERCICIO
shakesLimpoRDD = shakesPalavrasRDD.filter(lambda x: len(x)>0)#<COMPLETAR>
total = shakesLimpoRDD.count()
print total


903705

In [126]:
assert total==882996, 'valor incorreto!'
print "OK"


---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-126-eaded5957490> in <module>()
----> 1 assert total==882996, 'valor incorreto!'
      2 print "OK"

AssertionError: valor incorreto!

(4f) Contagem de palavras

Agora que nossa RDD contém uma lista de palavras, podemos aplicar nossa função contaPalavras().

Aplique a função em nossa RDD e utilize a função takeOrdered para imprimir as 15 palavras mais frequentes.

takeOrdered() pode receber um segundo parâmetro que instrui o Spark em como ordenar os elementos. Ex.:

takeOrdered(15, key=lambda x: -x): ordem decrescente dos valores de x


In [140]:
# EXERCICIO
#print contaPalavras(shakesLimpoRDD).collect()
top15 = contaPalavras(shakesLimpoRDD).takeOrdered(15, lambda x: -x[1])#<COMPLETAR>
print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15))


the: 27825
and: 26791
i: 20681
to: 19261
of: 18289
a: 14667
you: 13716
my: 12481
that: 11135
in: 11027
is: 9621
not: 8745
for: 8261
with: 8046
me: 7769

In [141]:
assert top15 == [(u'the', 27361), (u'and', 26028), (u'i', 20681), (u'to', 19150), (u'of', 17463),
                   (u'a', 14593), (u'you', 13615), (u'my', 12481), (u'in', 10956), (u'that', 10890),
                   (u'is', 9134), (u'not', 8497), (u'with', 7771), (u'me', 7769), (u'it', 7678)],'valores incorretos!'
print "OK"


---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-141-6bbde664e0d3> in <module>()
      1 assert top15 == [(u'the', 27361), (u'and', 26028), (u'i', 20681), (u'to', 19150), (u'of', 17463),
      2                    (u'a', 14593), (u'you', 13615), (u'my', 12481), (u'in', 10956), (u'that', 10890),
----> 3                    (u'is', 9134), (u'not', 8497), (u'with', 7771), (u'me', 7769), (u'it', 7678)],'valores incorretos!'
      4 print "OK"

AssertionError: valores incorretos!

Parte 5: Similaridade entre Objetos

Nessa parte do laboratório vamos aprender a calcular a distância entre atributos numéricos, categóricos e textuais.

(5a) Vetores no espaço Euclidiano

Quando nossos objetos são representados no espaço Euclidiano, medimos a similaridade entre eles através da p-Norma definida por:

$$d(x,y,p) = (\sum_{i=1}^{n}{|x_i - y_i|^p})^{1/p}$$

As normas mais utilizadas são $p=1,2,\infty$ que se reduzem em distância absoluta, Euclidiana e máxima distância:

$$d(x,y,1) = \sum_{i=1}^{n}{|x_i - y_i|}$$

$$d(x,y,2) = (\sum_{i=1}^{n}{|x_i - y_i|^2})^{1/2}$$

$$d(x,y,\infty) = \max(|x_1 - y_1|,|x_2 - y_2|, ..., |x_n - y_n|)$$


In [142]:
import numpy as np

# Vamos criar uma função pNorm que recebe como parâmetro p e retorna uma função que calcula a pNorma
def pNorm(p):
    """Generates a function to calculate the p-Norm between two points.

    Args:
        p (int): The integer p.

    Returns:
        Dist: A function that calculates the p-Norm.
    """

    def Dist(x,y):
        return np.power(np.power(np.abs(x-y),p).sum(),1/float(p))
    return Dist

In [143]:
# Vamos criar uma RDD com valores numéricos
numPointsRDD = sc.parallelize(enumerate(np.random.random(size=(10,100))))

In [172]:
# EXERCICIO
# Procure dentre os comandos do PySpark, um que consiga fazer o produto cartesiano da base com ela mesma

cartPointsRDD = numPointsRDD.cartesian(numPointsRDD)#<COMPLETAR>

# Aplique um mapa para transformar nossa RDD em uma RDD de tuplas ((id1,id2), (vetor1,vetor2))
# DICA: primeiro utilize o comando take(1) e imprima o resultado para verificar o formato atual da RDD
cartPointsParesRDD = cartPointsRDD.map(lambda ((x1,x2),(y1,y2)):((x1,y1),(x2,y2)))#<COMPLETAR>

#print cartPointsParesRDD
#Aplique um mapa para calcular a Distância Euclidiana entre os pares
Euclid = pNorm(2)
distRDD = cartPointsParesRDD.map(lambda ((x1,y1),(x2,y2)): Euclid(x2,y2))#<COMPLETAR>
#print(distRDD.collect())
# Encontre a distância máxima, mínima e média, aplicando um mapa que transforma (chave,valor) --> valor
# e utilizando os comandos internos do pyspark para o cálculo da min, max, mean
#statRDD = distRDD.<COMPLETAR>

#minv, maxv, meanv = statRDD.<COMPLETAR>, statRDD.<COMPLETAR>, statRDD.<COMPLETAR>
minv, maxv, meanv = distRDD.min(), distRDD.max(), distRDD.mean()#<COMPLETAR>
print minv, maxv, meanv


0.0 4.70083217229 3.74313159518

In [173]:
assert (minv.round(2), maxv.round(2), meanv.round(2))==(0.0, 4.70, 3.65), 'Valores incorretos'
print "OK"


---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-173-400e44f76b53> in <module>()
----> 1 assert (minv.round(2), maxv.round(2), meanv.round(2))==(0.0, 4.70, 3.65), 'Valores incorretos'
      2 print "OK"

AssertionError: Valores incorretos

(5b) Valores Categóricos

Quando nossos objetos são representados por atributos categóricos, eles não possuem uma similaridade espacial. Para calcularmos a similaridade entre eles podemos primeiro transformar nosso vetor de atrbutos em um vetor binário indicando, para cada possível valor de cada atributo, se ele possui esse atributo ou não.

Com o vetor binário podemos utilizar a distância de Hamming definida por:

$$ H(x,y) = \sum_{i=1}^{n}{x_i != y_i} $$

Também é possível definir a distância de Jaccard como:

$$ J(x,y) = \frac{\sum_{i=1}^{n}{x_i == y_i} }{\sum_{i=1}^{n}{\max(x_i, y_i}) } $$


In [174]:
# Vamos criar uma função para calcular a distância de Hamming
def Hamming(x,y):
    """Calculates the Hamming distance between two binary vectors.

    Args:
        x, y (np.array): Array of binary integers x and y.

    Returns:
        H (int): The Hamming distance between x and y.
    """
    return (x!=y).sum()

# Vamos criar uma função para calcular a distância de Jaccard
def Jaccard(x,y):
    """Calculates the Jaccard distance between two binary vectors.

    Args:
        x, y (np.array): Array of binary integers x and y.

    Returns:
        J (int): The Jaccard distance between x and y.
    """
    return (x==y).sum()/float( np.maximum(x,y).sum() )

In [175]:
# Vamos criar uma RDD com valores categóricos
catPointsRDD = sc.parallelize(enumerate([['alto', 'caro', 'azul'],
                             ['medio', 'caro', 'verde'],
                             ['alto', 'barato', 'azul'],
                             ['medio', 'caro', 'vermelho'],
                             ['baixo', 'barato', 'verde'],
                            ]))

In [198]:
# EXERCICIO

# Crie um RDD de chaves únicas utilizando flatMap
chavesRDD = catPointsRDD.flatMap(lambda x: (x[1])).distinct()#.zipWithIndex()

chaves = dict((v,k) for k,v in enumerate(chavesRDD.collect()))
nchaves = len(chaves)
print chaves, nchaves


{'alto': 0, 'medio': 7, 'baixo': 6, 'barato': 1, 'azul': 2, 'verde': 3, 'caro': 5, 'vermelho': 4} 8

In [199]:
assert chaves=={'alto': 0, 'medio': 1, 'baixo': 2, 'barato': 3, 'azul': 4, 'verde': 5, 'caro': 6, 'vermelho': 7}, 'valores incorretos!'
print "OK"

assert nchaves==8, 'número de chaves incorreta'
print "OK"


---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-199-3eeb8e1f9804> in <module>()
----> 1 assert chaves=={'alto': 0, 'medio': 1, 'baixo': 2, 'barato': 3, 'azul': 4, 'verde': 5, 'caro': 6, 'vermelho': 7}, 'valores incorretos!'
      2 print "OK"
      3 
      4 assert nchaves==8, 'número de chaves incorreta'
      5 print "OK"

AssertionError: valores incorretos!

In [200]:
def CreateNP(atributos,chaves):  
    """Binarize the categorical vector using a dictionary of keys.

    Args:
        atributos (list): List of attributes of a given object.
        chaves (dict): dictionary with the relation attribute -> index

    Returns:
        array (np.array): Binary array of attributes.
    """
    
    array = np.zeros(len(chaves))
    for atr in atributos:
        array[ chaves[atr] ] = 1
    return array

# Converte o RDD para o formato binário, utilizando o dict chaves
binRDD = catPointsRDD.map(lambda rec: (rec[0],CreateNP(rec[1], chaves)))
binRDD.collect()


Out[200]:
[(0, array([ 1.,  0.,  1.,  0.,  0.,  1.,  0.,  0.])),
 (1, array([ 0.,  0.,  0.,  1.,  0.,  1.,  0.,  1.])),
 (2, array([ 1.,  1.,  1.,  0.,  0.,  0.,  0.,  0.])),
 (3, array([ 0.,  0.,  0.,  0.,  1.,  1.,  0.,  1.])),
 (4, array([ 0.,  1.,  0.,  1.,  0.,  0.,  1.,  0.]))]

In [201]:
# EXERCICIO
# Procure dentre os comandos do PySpark, um que consiga fazer o produto cartesiano da base com ela mesma
cartBinRDD = binRDD.cartesian(binRDD)#<COMPLETAR>

# Aplique um mapa para transformar nossa RDD em uma RDD de tuplas ((id1,id2), (vetor1,vetor2))
# DICA: primeiro utilize o comando take(1) e imprima o resultado para verificar o formato atual da RDD
cartBinParesRDD = cartBinRDD.map(lambda ((x1,x2),(y1,y2)):((x1,y1),(x2,y2)))#<COMPLETAR>
distRDD = cartPointsParesRDD.map(lambda ((x1,y1),(x2,y2)): Euclid(x2,y2))#<COMPLETAR>

# Aplique um mapa para calcular a Distância de Hamming e Jaccard entre os pares
hamRDD = cartBinParesRDD.map(lambda ((x1,y1),(x2,y2)): Hamming(x2,y2))#<COMPLETAR>
jacRDD = cartBinParesRDD.map(lambda ((x1,y1),(x2,y2)): Jaccard(x2,y2))#<COMPLETAR>

# Encontre a distância máxima, mínima e média, aplicando um mapa que transforma (chave,valor) --> valor
# e utilizando os comandos internos do pyspark para o cálculo da min, max, mean
#statHRDD = hamRDD.<COMPLETAR>
#statJRDD = jacRDD.<COMPLETAR>

Hmin, Hmax, Hmean = hamRDD.min(), hamRDD.max(), hamRDD.mean()
Jmin, Jmax, Jmean = jacRDD.min(), jacRDD.max(), jacRDD.mean()

#Hmin, Hmax, Hmean = statHRDD.<COMPLETAR>, statHRDD.<COMPLETAR>, statHRDD.<COMPLETAR>
#Jmin, Jmax, Jmean = statJRDD.<COMPLETAR>, statJRDD.<COMPLETAR>, statJRDD.<COMPLETAR>

print "\t\tMin\tMax\tMean"
print "Hamming:\t{:.2f}\t{:.2f}\t{:.2f}".format(Hmin, Hmax, Hmean )
print "Jaccard:\t{:.2f}\t{:.2f}\t{:.2f}".format( Jmin, Jmax, Jmean )


		Min	Max	Mean
Hamming:	0.00	6.00	3.52
Jaccard:	0.33	2.67	1.14

In [202]:
assert (Hmin.round(2), Hmax.round(2), Hmean.round(2)) == (0.00,6.00,3.52), 'valores incorretos'
print "OK"
assert (Jmin.round(2), Jmax.round(2), Jmean.round(2)) == (0.33,2.67,1.14), 'valores incorretos'
print "OK"


OK
OK

In [ ]: