Python

Python como lenguaje tiene las siguientes características:

  • Alto nivel
  • Intepretado
  • Orientado a objetos (pero en realidad multiparadigma)

Ejemplo de programa

Para mostrar las facilidades de comprensión de los programas en python observe lo siguiente:

for line in open(file.txt):
    for word in line.split():
        if word.endswith(ing):
            print word

Esto se puede leer línea por línea en inglés

Funciones en python

En términos básicos una función es una manera de empaquetar código para su posterior reuso.


In [ ]:
def repetir(texto, num_veces):
    return texto*num_veces

In [ ]:
monty = "Monty Python "
repetir(monty, 3)

In [ ]:
repetir("Hola Puno ", 5)

También existen las funciones anónimas o funciones lambda


In [ ]:
cubo = lambda x: x*x*x

In [ ]:
cubo(3)

Tipos de datos en python

Existen datos de tipo númericos (Enteros, flotantes, etc.)


In [ ]:
variable_1 = 10

In [ ]:
variable_1

Una cadena es una secuencia de caracteres.


In [ ]:
variable_2 = "John Smith"
variable_2

Una lista es un conjunto ordenado de elementos


In [ ]:
variable_3 = [10, "John Smith", ['another', 'list']]
variable_3

In [ ]:
variable_3[2] = "Adolfo"
variable_3

Una tupla es una lista inmutable


In [ ]:
variable_4 = (10, "John Smith")
variable_4

Un diccionario contiene pares llave-valor (KV)


In [ ]:
variable_5 = {"name": "John Smith", "age": 45}
variable_5

In [ ]:
variable_5['name']

Ejemplo de uso de bibliotecas de sistema operativo


In [ ]:
import os

In [ ]:
os.listdir("data/data_berka/")

Se puede obtener lo mismo con comandos mágicos de Jupyter


In [ ]:
!ls data/data_berka/

Apache Spark

A fast and general engine for large scale data processing

  • Framework de cómputo general para clusters
  • Ejecuta en YARN
    • Aunque también puede hacer standalone, o ejecutar sobre EC2 o Mesos.
  • Soporta varios lenguajes
    • Python, Java, Scala y recientemente en R

¿Por qué Apache Spark?

  • Algoritmos iterativos
  • Exploración interactiva
  • Plataforma unificada de análisis de datos (en gran escala)
    • batch, interactivo, streaming

Historia de Spark

  • Mesos, era un framework distribuido creado como proyecto para una clase en UC Berkeley in 2009.
  • Spark fué creado para ver si Mesos funcionaba.
  • Fué abierto a partir de 2010

Resilient Distributed Datasets (RDDs)

  • Es una de las ideas principales de Spark.
  • RDDs es una abstracción que representa una colleción de objetos de sólo lectura que está particionada a lo largo de varias máquinas.
  • Sus ventajas:
    • Pueden ser reconstruidas a partir de su lineage. (Soportan fallos...)
    • Pueden ser accesadas vía operaciones en paralelo, parecidas a MapReduce.
    • Son cached en memoria para su uso inmediato.
    • Fueron construidas para ser almacenadas de manera distribuida.
    • Contienen cualquier tipo de dato (ya sea de Python, R, Java o Scala) incluidos tipos definidos por el programador.
  • Soportan dos tipos de operaciones

    • Transformaciones
    • Acciones.
  • Las transformaciones construyen un RDD nuevo a partir del anterior.

    • Cada transformación queda guardada por =Spark= en el /lineage graph/ un DAG.
  • Las acciones calculan un resultado basado en el RDD.

  • La diferencia es que las RDD son computadas en forma lazy, sólo son ejecutadas hasta la acción.

  • Si quieres usarlo una RDD varias veces debes de persistirla (con persist()).

Flujo típico de trabajo

  1. Crear un RDD a partir de datos externos.
  2. Transformarlo a nuevos RDDs.
  3. Persistir algunos RDDs para su uso posterior.
  4. Lanzar acciones.

Ejemplo de flujo de trabajo


In [ ]:
accounts = sc.textFile("data/data_berka/account.asc")

In [ ]:
accounts.count()

In [ ]:
accounts.first()

In [ ]:
accounts.take(10)

In [ ]:
not_poplatek = accounts.filter(lambda account: "POPLATEK" not in account)

In [ ]:
not_poplatek.count()

In [ ]:
not_poplatek.first()

In [ ]:
def notPopLatek(account):
    return "POPLATEK" not in account

In [ ]:
not_poplatek = accounts.filter(notPopLatek)

In [ ]:
not_poplatek.count()

In [ ]:
not_poplatek.first()

Spark - RDD API

  • RDD API

  • From API docs: "immutable, partitioned collection of elements that can be operated on in parallel"

Transformaciones

  • map
    • Usa una función y la aplica a cada elemento del RDD, el resultado se guarda en un nuevo RDD.
  • filter
    • Usa una función y devuelve sólo los elementos que pasan la función (que devuelven verdadero) en el nuevo RDD.
  • flatMap
    • Como el map pero regresa un iterador por cada elemento
      • Por ejemplo una función que divide una cadena.
  • distinct, sample
  • union, intersection, substract, cartesian

In [ ]:
numeros = sc.parallelize([1,2,3,4,5])

In [ ]:
numeros.count()

In [ ]:
cuadrados = numeros.map(lambda x: x*x).collect()

In [ ]:
for cuadrado in cuadrados:
    print("%i " % (cuadrado))

In [ ]:
pares = numeros.filter(lambda x: x%2 == 0).collect()

In [ ]:
for par in pares:
    print("%i " % (par))

In [ ]:
frases = sc.parallelize(["hola estudiantes", "xvii congreso estudiantil"])
palabras = frases.flatMap(lambda frase: frase.split(" "))
palabras.first()

Acciones

  • reduce
    • Opera en dos elementos del mismo tipo del RDD y regresa un elemento del mismo tipo.
  • aggregate
    • Nos permite implementar acumuladores.
  • collect
    • Regresa el RDD completo.
  • =ake
    • Regresa un número =n= de elementos del =RDD=.
  • count, countByValue, top, foreach.

In [ ]:
suma = numeros.reduce(lambda x, y: x + y)
suma

In [ ]:
sumaConteo = numeros.aggregate((0, 0), # Valor inicial
                               (lambda acc, value: (acc[0] + value, acc[1] + 1)), # Combinamos el RDD con el acumulador
                               (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # Juntamos ambos acumuladores
                              )

In [ ]:
sumaConteo[0]

In [ ]:
sumaConteo[1]

In [ ]:
sumaConteo[0]/float(sumaConteo[1])

In [ ]:
numeros.take(2)

In [ ]:
numeros.top(3)

In [ ]:
numeros.takeSample(withReplacement=True, num=2, seed=1334 )

Paired RDD


In [ ]:
accounts_csv = accounts.\
            filter(lambda line: "account_id" not in line).\
            map(lambda x: x.split(";"))
accounts_csv.take(5)

In [ ]:
kv_accounts = accounts_csv.map(lambda x: (x[1], x)) # x[1] contiene el district_id
kv_accounts.take(5)

In [ ]:
kv_accounts.keys().first()

In [ ]:
kv_accounts.values().first()

In [ ]:
kv_accounts.sortByKey().take(10)

In [ ]:
kv_districts = sc.textFile("data/data_berka/district.asc").\
                filter(lambda x: "A1" not in x).\
                map(lambda x: x.split(";")).\
                map(lambda x: (x[0], x))
kv_districts.take(5)

In [ ]:
accounts_district = kv_accounts.join(kv_districts)
accounts_district.take(5)

Leer y escribir

Archivos de texto

Un registro, una línea


In [ ]:
clients = sc.textFile("data/data_berka/client.asc")

In [ ]:
clients.count()

In [ ]:
clients.first()

In [ ]:
sampled_clients = clients.sample(withReplacement=False, fraction=0.01, seed=1334)

In [ ]:
! rm -R output/sampled_clients/

In [ ]:
sampled_clients.saveAsTextFile("output/sampled_clients")

Archivos CSV


In [ ]:
accounts_df = sqlContext.read.format('com.databricks.spark.csv').\
                options(header='true', delimiter=';').\
                load("data/data_berka/account.asc")

In [ ]:
! rm -R output/accounts_csv/

In [ ]:
accounts_df.select("account_id", "district_id", "frequency").\
            write.format("com.databricks.spark.csv").\
            save("output/accounts_csv")

SparkSQL

SparkSQL además de permitirnos interactuar usando SQL con los RDDs (en realidad HQL Hive query language, ver documentación aquí), agregar una capa de abstracción al RDD y lo convierte en un DataFrame análogo al usado en R y Python.


In [ ]:
accounts_df.registerTempTable('accounts')

In [ ]:
sqlCtx.sql('show tables').show()

In [ ]:
sqlCtx.sql('select * from accounts limit 5').show()

SparkSQL y archivos JSON

En sparkSQL es más fácil tratar con archivos json


In [ ]:
! rm -R data/world_bank*

In [ ]:
! wget http://jsonstudio.com/wp-content/uploads/2014/02/world_bank.zip -P data/

In [ ]:
! unzip data/world_bank.zip -d data/world_bank

In [ ]:
world_bank = sqlCtx.read.json("data/world_bank/world_bank.json")

Automáticamente detecta el esquema de la fuente de datos


In [ ]:
world_bank.printSchema()

In [ ]:
world_bank.registerTempTable("world_bank_projects")

In [ ]:
sqlCtx.sql('show tables').show()

In [ ]:
sqlCtx.sql('select countryshortname, project_name, totalamt, totalcommamt from world_bank_projects order by countryshortname').show()

In [ ]:
projects_by_country = sqlCtx.sql('select countryshortname as country, count(project_name) as num_projects, sum(totalamt) as total_amount from world_bank_projects group by countryshortname order by countryshortname')
projects_by_country.show()

SparkSQL y Pandas

Es posible usar Pandas para hacer análisis, pero hay que tomar en cuenta que esto manda todo el dataset a un sólo nodo (más adelante veremos como usar MLib para hacerlo de manera distribuida)


In [ ]:
import pandas as pd

In [ ]:
projects_by_country_pd = projects_by_country.toPandas()

In [ ]:
projects_by_country_pd.columns

In [ ]:
projects_by_country_pd.describe()

In [ ]:
projects_by_country_pd['country']

In [ ]:
projects_by_country_pd=projects_by_country_pd.set_index(['country'])

In [ ]:
projects_by_country_pd.num_projects

In [ ]:
projects_by_country_pd.head()

In [ ]:
projects_by_country_pd.tail()

In [ ]:
projects_by_country_pd[10:20]

In [ ]:
projects_by_country_pd.ix['Peru']

In [ ]:
%pylab inline
projects_by_country_pd['num_projects'][:10].plot(kind='barh', rot=0, )

Ejemplo básico: WordCount

Descargaremos del Proyecto Gutenberg el libro Beowulf de J. Lesslie Hall.


In [ ]:
! wget https://www.gutenberg.org/ebooks/16328.txt.utf-8 -P data/books

In [ ]:
def tokenize(texto):
    return texto.split()

Veámos que hace esta función


In [ ]:
tokenize("En el bosque, de la China, la chinita se perdió")

In [ ]:
beowulf = sc.textFile("data/books/16328.txt.utf-8") # Creamos el RDD desde archivo

In [ ]:
wordcount = beowulf.map(lambda line: line.lower()).\
                        flatMap(tokenize).\
                        map(lambda x: (x,1)).\
                        reduceByKey(lambda x, y: x + y).\
                        map(lambda x: (x[1], x[0])).\
                        sortByKey(ascending=False)

In [ ]:
wordcount.take(50)

Como podemos ver, hay muchas palabras que no dicen nada sobre la obra, para hacer minería de textos tendríamos que limpiarlas...

Veámoslo en pandas


In [ ]:
words_counted = pd.DataFrame(wordcount.collect(), columns=["freq", "word"])
words_counted[:10]

In [ ]:
words_counted['freq'][:10]

In [ ]:
%pylab inline
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(16,12));
words_counted['freq'][:100].plot(kind="bar")
ax.set_xticklabels(words_counted['word'][:100]);