A versão do Spark que usaremos nas aulas é 1.6.2, que é a que está disponível neste ambiente.
A abstração principal da Spark é uma coleção distribuída de itens chamada Resilient Distributed Dataset (RDD)(que será detalhada na próxima aula). Os RDDs podem ser criados a partir de Hadoop InputFormats (como arquivos HDFS) ou transformando outros RDDs. Vamos fazer um novo RDD a partir do dataset de notícias usado no trabalho 2:
In [ ]:
newsTextFile = sc.textFile("news.xml")
sc é uma variável que este ambiente fornece para acessarmos o SparkContext.
Os RDDs têm actions, que retornam valores, e transformations, que retornam ponteiros para novos RDDs. Vamos começar com algumas ações:
In [ ]:
newsTextFile.count() # Número de itens nesse RDD. Como é um RDD de text, incialmente, itens aqui são as linhas.
In [ ]:
newsTextFile.first() # Primeiro item neste RDD
Objetivo dessa prática é fazermos uma contagem de palavras e listar as palavras mais frequentes. Entretanto, queremos fazer isso somente sobre o texto das notícias. Como você percebeu na execução anterior e no trabalho T2, o dataset contém algumas linhas iniciais que não são notícias. O primeiro passo então é filtrar somente as linhas que são notícias.
In [ ]:
def is_news(line):
# Dê um corpo a função abaixo.
# Ela deve receber uma linha e retornar True se for uma notícia ou False caso contrário.
return True
onlyNewsLines = newsTextFile.filter(is_news)
onlyNewsLines.take(5)
Ainda preparando o dado, devemos extrair apenas o texto das notícias e analisar apenas notícias com texto. Então faremos as seguintes transformações:
In [ ]:
import lesson1_utils as l1u
newsText = onlyNewsLines.map(l1u.extract_text).filter(lambda text: text != None and len(text) > 0)
newsText.take(5)
Neste momento, nosso RDD é composto somente pelos textos das notícias. Analisemos um pouco o RDD com algumas transformações e ações.
In [ ]:
# A transformação filter retorna um subconjunto do RDD inicial.
# Somente é selecionado o item que obedece, ou seja, retorna True, na função de filtragem.
newsWithBrasil = newsText.filter(lambda text: "Brasil" in text)
print "Quantidade de notícias que citam o 'Brasil': %d" % newsWithBrasil.count()
As ações e transformações de RDD podem ser usadas para computações mais complexas. Vamos dizer que queremos encontrar a notícia com mais palavras:
In [ ]:
newsText.map(lambda text: len( text.split() )).reduce(lambda a, b: a if a > b else b)
A primeira transformação, map (transformação), mapeia um texto para um valor inteiro (quantidade de palavras), criando um novo RDD. reduce (ação) é chamado em que RDD para encontrar a maior quantidade de palavras. Os argumentos para o map e reduce são funções anônimas do Python (lambdas), mas também podemos passar qualquer função Python de nível superior que desejamos. Por exemplo, vamos definir uma função max para tornar este código mais fácil de entender:
In [ ]:
def max(a, b):
if a > b:
return a
else:
return b
newsText.map(lambda text: len( text.split() )).reduce(max)
Um padrão de fluxo de dados comum é MapReduce, como popularizado pelo Hadoop. Com o Spark pode-se implementar MapReduce facilmente, como nesse exemplo de contagem de palavras:
In [ ]:
def toWords(text): return text.split()
def one(word): return (word, 1)
def sum(a, b): return a + b
wordsCounts = newsText.flatMap(toWords).map(one).reduceByKey(sum)
Aqui, combinamos as transformações flatMap, map e reduceByKey para calcular as contagens por palavra no arquivo como um RDD de pares (string, int).
Podemos usar a ação collect para ver o resultado.
In [ ]:
sorted(wordsCounts.collect(), key=lambda (_, c): c, reverse=True)[:10]
In [ ]:
# PASSOS:
# 1. Transforme o arquivo stopwords.pt em um a lista python, onde cada linha é um item da lista
# 2. Implemente uma função que recebe uma lista de palavras, oriundas do texto de uma notícia, e remova as palvras
# que estão presentes na lista de stopwords montadas no passo 1.
# 3. Altere a função toWords, retornando a chamada da função criada no passo 2. recebendo como parâmetro o split
# do texto
# 4. Refaça as chamadas:
# 4.1. wordsCounts = newsText.flatMap(toWords).map(one).reduceByKey(sum)
# 4.2. sorted(wordsCounts.collect(), key=lambda (_, c): c, reverse=True)[:10]
# E só!