Python como lenguaje tiene las siguientes características:
Para mostrar las facilidades de comprensión de los programas en python observe lo siguiente:
for line in open(”file.txt”):
for word in line.split():
if word.endswith(”ing”):
print word
Esto se puede leer línea por línea en inglés
In [ ]:
def repetir(texto, num_veces):
return texto*num_veces
In [ ]:
monty = "Monty Python "
repetir(monty, 3)
In [ ]:
repetir("Hola Puno ", 5)
También existen las funciones anónimas o funciones lambda
In [ ]:
cubo = lambda x: x*x*x
In [ ]:
cubo(3)
Existen datos de tipo númericos (Enteros, flotantes, etc.)
In [ ]:
variable_1 = 10
In [ ]:
variable_1
Una cadena es una secuencia de caracteres.
In [ ]:
variable_2 = "John Smith"
variable_2
Una lista es un conjunto ordenado de elementos
In [ ]:
variable_3 = [10, "John Smith", ['another', 'list']]
variable_3
In [ ]:
variable_3[2] = "Adolfo"
variable_3
Una tupla es una lista inmutable
In [ ]:
variable_4 = (10, "John Smith")
variable_4
Un diccionario contiene pares llave-valor (KV)
In [ ]:
variable_5 = {"name": "John Smith", "age": 45}
variable_5
In [ ]:
variable_5['name']
In [ ]:
import os
In [ ]:
os.listdir("data/data_berka/")
Se puede obtener lo mismo con comandos mágicos de Jupyter
In [ ]:
!ls data/data_berka/
YARNEC2 o Mesos.Python, Java, Scala y recientemente en RRDDs es una abstracción que representa una colleción de objetos de sólo lectura que está particionada a lo largo de varias máquinas.Python, R, Java o Scala) incluidos tipos definidos por el programador.Soportan dos tipos de operaciones
Las transformaciones construyen un RDD nuevo a partir del anterior.
Las acciones calculan un resultado basado en el RDD.
La diferencia es que las RDD son computadas en forma lazy, sólo son ejecutadas hasta la acción.
Si quieres usarlo una RDD varias veces debes de persistirla (con persist()).
In [ ]:
accounts = sc.textFile("data/data_berka/account.asc")
In [ ]:
accounts.count()
In [ ]:
accounts.first()
In [ ]:
accounts.take(10)
In [ ]:
not_poplatek = accounts.filter(lambda account: "POPLATEK" not in account)
In [ ]:
not_poplatek.count()
In [ ]:
not_poplatek.first()
In [ ]:
def notPopLatek(account):
return "POPLATEK" not in account
In [ ]:
not_poplatek = accounts.filter(notPopLatek)
In [ ]:
not_poplatek.count()
In [ ]:
not_poplatek.first()
mapRDD, el resultado se guarda en un nuevo RDD.filterRDD.flatMapmap pero regresa un iterador por cada elementodistinct, sampleunion, intersection, substract, cartesian
In [ ]:
numeros = sc.parallelize([1,2,3,4,5])
In [ ]:
numeros.count()
In [ ]:
cuadrados = numeros.map(lambda x: x*x).collect()
In [ ]:
for cuadrado in cuadrados:
print("%i " % (cuadrado))
In [ ]:
pares = numeros.filter(lambda x: x%2 == 0).collect()
In [ ]:
for par in pares:
print("%i " % (par))
In [ ]:
frases = sc.parallelize(["hola estudiantes", "xvii congreso estudiantil"])
palabras = frases.flatMap(lambda frase: frase.split(" "))
palabras.first()
reduceRDD y regresa un elemento del mismo tipo.aggregatecollectRDD completo.akecount, countByValue, top, foreach.
In [ ]:
suma = numeros.reduce(lambda x, y: x + y)
suma
In [ ]:
sumaConteo = numeros.aggregate((0, 0), # Valor inicial
(lambda acc, value: (acc[0] + value, acc[1] + 1)), # Combinamos el RDD con el acumulador
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # Juntamos ambos acumuladores
)
In [ ]:
sumaConteo[0]
In [ ]:
sumaConteo[1]
In [ ]:
sumaConteo[0]/float(sumaConteo[1])
In [ ]:
numeros.take(2)
In [ ]:
numeros.top(3)
In [ ]:
numeros.takeSample(withReplacement=True, num=2, seed=1334 )
In [ ]:
accounts_csv = accounts.\
filter(lambda line: "account_id" not in line).\
map(lambda x: x.split(";"))
accounts_csv.take(5)
In [ ]:
kv_accounts = accounts_csv.map(lambda x: (x[1], x)) # x[1] contiene el district_id
kv_accounts.take(5)
In [ ]:
kv_accounts.keys().first()
In [ ]:
kv_accounts.values().first()
In [ ]:
kv_accounts.sortByKey().take(10)
In [ ]:
kv_districts = sc.textFile("data/data_berka/district.asc").\
filter(lambda x: "A1" not in x).\
map(lambda x: x.split(";")).\
map(lambda x: (x[0], x))
kv_districts.take(5)
In [ ]:
accounts_district = kv_accounts.join(kv_districts)
accounts_district.take(5)
In [ ]:
clients = sc.textFile("data/data_berka/client.asc")
In [ ]:
clients.count()
In [ ]:
clients.first()
In [ ]:
sampled_clients = clients.sample(withReplacement=False, fraction=0.01, seed=1334)
In [ ]:
! rm -R output/sampled_clients/
In [ ]:
sampled_clients.saveAsTextFile("output/sampled_clients")
In [ ]:
accounts_df = sqlContext.read.format('com.databricks.spark.csv').\
options(header='true', delimiter=';').\
load("data/data_berka/account.asc")
In [ ]:
! rm -R output/accounts_csv/
In [ ]:
accounts_df.select("account_id", "district_id", "frequency").\
write.format("com.databricks.spark.csv").\
save("output/accounts_csv")
SparkSQL además de permitirnos interactuar usando SQL con los RDDs (en realidad HQL Hive query language, ver documentación aquí), agregar una capa de abstracción al RDD y lo convierte en un DataFrame análogo al usado en R y Python.
In [ ]:
accounts_df.registerTempTable('accounts')
In [ ]:
sqlCtx.sql('show tables').show()
In [ ]:
sqlCtx.sql('select * from accounts limit 5').show()
En sparkSQL es más fácil tratar con archivos json
In [ ]:
! rm -R data/world_bank*
In [ ]:
! wget http://jsonstudio.com/wp-content/uploads/2014/02/world_bank.zip -P data/
In [ ]:
! unzip data/world_bank.zip -d data/world_bank
In [ ]:
world_bank = sqlCtx.read.json("data/world_bank/world_bank.json")
Automáticamente detecta el esquema de la fuente de datos
In [ ]:
world_bank.printSchema()
In [ ]:
world_bank.registerTempTable("world_bank_projects")
In [ ]:
sqlCtx.sql('show tables').show()
In [ ]:
sqlCtx.sql('select countryshortname, project_name, totalamt, totalcommamt from world_bank_projects order by countryshortname').show()
In [ ]:
projects_by_country = sqlCtx.sql('select countryshortname as country, count(project_name) as num_projects, sum(totalamt) as total_amount from world_bank_projects group by countryshortname order by countryshortname')
projects_by_country.show()
Es posible usar Pandas para hacer análisis, pero hay que tomar en cuenta que esto manda todo el dataset a un sólo nodo (más adelante veremos como usar MLib para hacerlo de manera distribuida)
In [ ]:
import pandas as pd
In [ ]:
projects_by_country_pd = projects_by_country.toPandas()
In [ ]:
projects_by_country_pd.columns
In [ ]:
projects_by_country_pd.describe()
In [ ]:
projects_by_country_pd['country']
In [ ]:
projects_by_country_pd=projects_by_country_pd.set_index(['country'])
In [ ]:
projects_by_country_pd.num_projects
In [ ]:
projects_by_country_pd.head()
In [ ]:
projects_by_country_pd.tail()
In [ ]:
projects_by_country_pd[10:20]
In [ ]:
projects_by_country_pd.ix['Peru']
In [ ]:
%pylab inline
projects_by_country_pd['num_projects'][:10].plot(kind='barh', rot=0, )
Descargaremos del Proyecto Gutenberg el libro Beowulf de J. Lesslie Hall.
In [ ]:
! wget https://www.gutenberg.org/ebooks/16328.txt.utf-8 -P data/books
In [ ]:
def tokenize(texto):
return texto.split()
Veámos que hace esta función
In [ ]:
tokenize("En el bosque, de la China, la chinita se perdió")
In [ ]:
beowulf = sc.textFile("data/books/16328.txt.utf-8") # Creamos el RDD desde archivo
In [ ]:
wordcount = beowulf.map(lambda line: line.lower()).\
flatMap(tokenize).\
map(lambda x: (x,1)).\
reduceByKey(lambda x, y: x + y).\
map(lambda x: (x[1], x[0])).\
sortByKey(ascending=False)
In [ ]:
wordcount.take(50)
Como podemos ver, hay muchas palabras que no dicen nada sobre la obra, para hacer minería de textos tendríamos que limpiarlas...
Veámoslo en pandas
In [ ]:
words_counted = pd.DataFrame(wordcount.collect(), columns=["freq", "word"])
words_counted[:10]
In [ ]:
words_counted['freq'][:10]
In [ ]:
%pylab inline
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(16,12));
words_counted['freq'][:100].plot(kind="bar")
ax.set_xticklabels(words_counted['word'][:100]);