Esta hoja muestra cómo acceder a bases de datos MongoDB y también a conectar la salida con Jupyter. Se puede utilizar el shell propio de MongoDB en la máquina virtual usando el programa mongo
. La diferencia es que ese programa espera código Javascript y aquí trabajaremos con Python.
In [ ]:
!pip install --upgrade pymongo
In [ ]:
from pprint import pprint as pp
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
%matplotlib inline
matplotlib.style.use('ggplot')
Usaremos la librería pymongo
para python. La cargamos a continuación.
In [ ]:
import pymongo
from pymongo import MongoClient
La conexión se inicia con MongoClient
en el host
descrito en el fichero docker-compose.yml
(mongo
).
In [ ]:
client = MongoClient("mongo",27017)
client
In [ ]:
client.list_database_names()
1
: AcceptedByOriginator2
: UpMod3
: DownMod4
: Offensive5
: Favorite - if VoteTypeId = 5 UserId will be populated6
: Close7
: Reopen8
: BountyStart9
: BountyClose10
: Deletion11
: Undeletion12
: Spam13
: InformModeratorLas bases de datos se crean conforme se nombran. Se puede utilizar la notación punto o la de diccionario. Las colecciones también.
In [ ]:
db = client.stackoverflow
db = client['stackoverflow']
db
Las bases de datos están compuestas por un conjunto de colecciones. Cada colección aglutina a un conjunto de objetos (documentos) del mismo tipo, aunque como vimos en teoría, cada documento puede tener un conjunto de atributos diferente.
In [ ]:
posts = db.posts
posts
Importación de los ficheros CSV. Por ahora creamos una colección diferente para cada uno. Después estudiaremos cómo poder optimizar el acceso usando agregación.
In [ ]:
import os
import os.path as path
from urllib.request import urlretrieve
def download_csv_upper_dir(baseurl, filename):
file = path.abspath(path.join(os.getcwd(),os.pardir,filename))
if not os.path.isfile(file):
urlretrieve(baseurl + '/' + filename, file)
baseurl = 'http://neuromancer.inf.um.es:8080/es.stackoverflow/'
download_csv_upper_dir(baseurl, 'Posts.csv')
download_csv_upper_dir(baseurl, 'Users.csv')
download_csv_upper_dir(baseurl, 'Tags.csv')
download_csv_upper_dir(baseurl, 'Comments.csv')
download_csv_upper_dir(baseurl, 'Votes.csv')
In [ ]:
import csv
from datetime import datetime
def csv_to_mongo(file, coll):
"""
Carga un fichero CSV en Mongo. file especifica el fichero, coll la colección
dentro de la base de datos, y date_cols las columnas que serán interpretadas
como fechas.
"""
# Convertir todos los elementos que se puedan a números
def to_numeric(d):
try:
return int(d)
except ValueError:
try:
return float(d)
except ValueError:
return d
def to_date(d):
"""To ISO Date. If this cannot be converted, return NULL (None)"""
try:
return datetime.strptime(d, "%Y-%m-%dT%H:%M:%S.%f")
except ValueError:
return None
coll.drop()
with open(file, encoding='utf-8') as f:
# La llamada csv.reader() crea un iterador sobre un fichero CSV
reader = csv.reader(f, dialect='excel')
# Se leen las columnas. Sus nombres se usarán para crear las diferentes columnas en la familia
columns = next(reader)
# Las columnas que contienen 'Date' se interpretan como fechas
func_to_cols = list(map(lambda c: to_date if 'date' in c.lower() else to_numeric, columns))
docs=[]
for row in reader:
row = [func(e) for (func,e) in zip(func_to_cols, row)]
docs.append(dict(zip(columns, row)))
coll.insert_many(docs)
In [ ]:
csv_to_mongo('../Posts.csv',db.posts)
In [ ]:
csv_to_mongo('../Users.csv',db.users)
In [ ]:
csv_to_mongo('../Votes.csv',db.votes)
In [ ]:
csv_to_mongo('../Comments.csv',db.comments)
In [ ]:
csv_to_mongo('../Tags.csv',db.tags)
In [ ]:
posts.count_documents()
El API de colección en Python se puede encontrar aquí: https://api.mongodb.com/python/current/api/pymongo/collection.html. La mayoría de libros y referencias muestran el uso de mongo desde Javascript, ya que el shell de MongoDB acepta ese lenguaje. La sintaxis con respecto a Python cambia un poco, y se puede seguir en el enlace anterior.
In [ ]:
(
db.posts.create_index([('Id', pymongo.HASHED)]),
db.comments.create_index([('Id', pymongo.HASHED)]),
db.users.create_index([('Id', pymongo.HASHED)])
)
Mongodb incluye dos APIs para procesar y buscar documentos: el API de Map-Reduce y el API de agregación. Veremos primero el de Map-Reduce. Manual: https://docs.mongodb.com/manual/aggregation/#map-reduce
In [ ]:
from bson.code import Code
In [ ]:
map = Code(
'''
function () {
emit(this.OwnerUserId, 1);
}
''')
In [ ]:
reduce = Code(
'''
function (key, values)
{
return Array.sum(values);
}
''')
In [ ]:
results = posts.map_reduce(map, reduce, "posts_by_userid")
In [ ]:
posts_by_userid = db.posts_by_userid
list(posts_by_userid.find())
Se le puede añadir una etiqueta para especificar sobre qué elementos queremos trabajar (query
):
La función map_reduce
puede llevar añadida una serie de keywords, los mismos especificados en la documentación:
query
: Restringe los datos que se tratansort
: Ordena los documentos de entrada por alguna clavelimit
: Limita el número de resultadosout
: Especifica la colección de salida y otras opciones. Lo veremos después.En el parámetro out
se puede especificar en qué colección se quedarán los datos resultado del map-reduce. Por defecto, en la colección origen. (Todos los parámetros aquí: https://docs.mongodb.com/manual/reference/command/mapReduce/#mapreduce-out-cmd). En la operación map_reduce()
podemos especificar la colección de salida, pero también podemos añadir un parámetro final out={...}
.
Hay varias posibilidades para out
:
replace
: Sustituye la colección, si la hubiera, con la especificada (p. ej.: out={ "replace" : "coll" }
.merge
: Mezcla la colección existente, sustituyendo los documentos que existan por los generados.reduce
: Si existe un documento con el mismo _id en la colección, se aplica la función reduce
para fusionar ambos documentos y producir un nuevo documento.Veremos a continuación, al resolver el ejercicio de crear post_comments
con map-reduce cómo se utilizan estas posibilidades.
También hay operaciones específicas de la coleción, como count()
, groupby()
y distinct()
:
In [ ]:
db.posts.distinct('Score')
Veremos la resolución de este ejercicio para que haga de ejemplo para los siguientes a implementar. En primer lugar, una operación map/reduce sólo se puede ejecutar sobre una colección, así que sólo puede contener resultados de la misma. Por lo tanto, con sólo una operación map/reduce no va a ser posible realizar todo el ejercicio.
Así, en primer lugar, parece interesante agrupar todos los comentarios que se han producido de un Post en particular. En cada comentario, el atributo PostId
marca una referencia al Post al que se refiere.
Es importante cómo se construyen las operaciones map()
y reduce()
. Primero, la función map()
se ejecutará para todos los documentos (o para todos los que cumplan la condición si se utiliza el modificador query=
). Sin embargo, la función reduce()
no se ejecutará a no ser que haya más de un elemento asociado a la misma clave.
Por lo tanto, la salida de la función map()
debe ser la misma que la de la función reduce()
. En nuestro caso, es un objeto JSON de la forma:
{ type: 'comment', comments: [ {comentario1, comentario2} ] }
En el caso de que sólo se ejecute la función map()
, nótese cómo el objeto tiene la misma composición, pero con un array de sólo un elemento (comentario): sí mismo.
In [ ]:
from bson.code import Code
comments_map = Code('''
function () {
emit(this.PostId, { type: 'comment', comments: [this]});
}
''')
comments_reduce = Code('''
function (key, values) {
comments = [];
values.forEach(function(v) {
if ('comments' in v)
comments = comments.concat(v.comments)
})
return { type: 'comment', comments: comments };
}
''')
db.comments.map_reduce(comments_map, comments_reduce, "post_comments")
In [ ]:
list(db.post_comments.find()[:10])
Esto demuestra que en general el esquema de datos en MongoDB no estaría así desde el principio.
Después del primer paso de map/reduce, tenemos que construir la colección final que asocia cada Post con sus comentarios. Como hemos construido antes la colección post_comments
indizada por el Id
del Post, podemos utilizar ahora una ejecución de map/reduce que mezcle los datos en post_comments
con los datos en posts
.
La segunda ejecución de map/reduce la haremos sobre posts
, para que el resultado sea completo, incluso para los Posts que no aparecen en comentarios, y por lo tanto tendrán el atributo comments
vacío.
En este caso, debemos hacer que la función map()
produzca una salida de documentos que también están indizados con el atributo Id
, y, como sólo hay uno para cada Id
, la función reduce()
no se ejecutará. Tan sólo se ejecutará para mezclar ambas colecciones, así que la función reduce()
tendrá que estar preparada para mezclar objetos de tipo "comment" y Posts. En cualquier caso, como se puede ver, es válida también aunque sólo se llame con un objeto de tipo Post. Finalmente, la función map()
prepara a cada objeto Post, inicialmente, con una lista de comentarios vacíos
In [ ]:
posts_map = Code("""
function () {
this.comments = [];
emit(this.Id, this);
}
""")
posts_reduce = Code("""
function (key, values) {
comments = []; // The set of comments
obj = {}; // The object to return
values.forEach(function(v) {
if (v['type'] === 'comment')
comments = comments.concat(v.comments);
else // Object
{
obj = v;
// obj.comments will always be there because of the map() operation
comments = comments.concat(obj.comments);
}
})
// Finalize: Add the comments to the object to return
obj.comments = comments;
return obj;
}
""")
db.posts.map_reduce(posts_map, posts_reduce, out={'reduce' : 'post_comments'})
In [ ]:
list(db.post_comments.find()[:10])
Framework de agregación: https://docs.mongodb.com/manual/reference/operator/aggregation/. Y aquí una presentación interesante sobre el tema: https://www.mongodb.com/presentations/aggregation-framework-0?jmp=docs&_ga=1.223708571.1466850754.1477658152
Proyección:
In [ ]:
respuestas = db['posts'].aggregate( [ {'$project' : { 'Id' : True }}, {'$limit': 20} ])
list(respuestas)
Lookup!
In [ ]:
respuestas = posts.aggregate( [
{'$match': { 'Score' : {'$gte': 40}}},
{'$lookup': {
'from': "users",
'localField': "OwnerUserId",
'foreignField': "Id",
'as': "owner"}
}
])
list(respuestas)
El $lookup
genera un array con todos los resultados. El operador $arrayElementAt
accede al primer elemento.
In [ ]:
respuestas = db.posts.aggregate( [
{'$match': { 'Score' : {'$gte': 40}}},
{'$lookup': {
'from': "users",
'localField': "OwnerUserId",
'foreignField': "Id",
'as': "owner"}
},
{ '$project' :
{
'Id' : True,
'Score' : True,
'username' : {'$arrayElemAt' : ['$owner.DisplayName', 0]},
'owner.DisplayName' : True
}}
])
list(respuestas)
$unwind
también puede usarse. "Desdobla" cada fila por cada elemento del array. En este caso, como sabemos que el array sólo contiene un elemento, sólo habrá una fila por fila original, pero sin el array. Finalmente se puede proyectar el campo que se quiera.
In [ ]:
respuestas = db.posts.aggregate( [
{'$match': { 'Score' : {'$gte': 40}}},
{'$lookup': {
'from': "users",
'localField': "OwnerUserId",
'foreignField': "Id",
'as': "owner"}
},
{ '$unwind': '$owner'},
{ '$project' :
{
'username': '$owner.DisplayName'
}
}
])
list(respuestas)
Como ejemplo de consulta compleja con el Framework de Agregación, adjunto una posible solución a la consulta RQ4:
In [ ]:
RQ4 = db.posts.aggregate( [
{ "$match" : {"PostTypeId": 2}},
{'$lookup': {
'from': "posts",
'localField': "ParentId",
'foreignField': "Id",
'as': "question"
}
},
{
'$unwind' : '$question'
},
{
'$project' : { 'OwnerUserId': True,
'OP' : '$question.OwnerUserId'
}
},
{
'$group' : {'_id' : {'min' : { '$min' : ['$OwnerUserId' , '$OP'] },
'max' : { '$max' : ['$OwnerUserId' , '$OP'] }},
'pairs' : {'$addToSet' : { '0q': '$OP', '1a': '$OwnerUserId'}}
}
},
{
'$project': {
'pairs' : True,
'npairs' : { '$size' : '$pairs'}
}
},
{
'$match' : { 'npairs' : { '$eq' : 2}}
}
])
RQ4 = list(RQ4)
RQ4
La explicación es como sigue:
posts
para recuperar los datos de la preguntapairs
), y sólo se añadirá una vez las posibles combinaciones iguales de preguntador/respondiente.La implementación en Map-Reduce se puede realizar con la misma idea.
En el caso de que queramos tener como referencia las preguntas y respuestas a las que se refiere la conversación, se puede añadir un campo más que guarde todas las preguntas junto con sus respuestas consideradas
In [ ]:
RQ4 = db.posts.aggregate( [
{'$match': { 'PostTypeId' : 2}},
{'$lookup': {
'from': "posts",
'localField': "ParentId",
'foreignField': "Id",
'as': "question"}
},
{
'$unwind' : '$question'
},
{
'$project' : {'OwnerUserId': True,
'QId' : '$question.Id',
'AId' : '$Id',
'OP' : '$question.OwnerUserId'
}
},
{
'$group' : {'_id' : {'min' : { '$min' : ['$OwnerUserId' , '$OP'] },
'max' : { '$max' : ['$OwnerUserId' , '$OP'] }},
'pairs' : {'$addToSet' : { '0q':'$OP', '1a': '$OwnerUserId'}},
'considered_pairs' : { '$push' : {'QId' : '$QId', 'AId' : '$AId'}}
}
},
{
'$project': {
'pairs' : True,
'npairs' : { '$size' : '$pairs'},
'considered_pairs' : True
}
},
{
'$match' : { 'npairs' : { '$eq' : 2}}
}
])
RQ4 = list(RQ4)
RQ4
In [ ]:
(db.posts.find_one({'Id': 238}), db.posts.find_one({'Id': 243}),
db.posts.find_one({'Id': 222}), db.posts.find_one({'Id': 223}))
Veamos cómo calcular el tiempo medio desde que se hace una pregunta hasta que se le da la primera respuesta. En este caso se puede utilizar las respuestas para apuntar a qué pregunta correspondieron. No se considerarán pues las preguntas que no tienen respuesta, lo cual es razonable. Sin embargo, la función map debe guardar también las preguntas para poder calcular el tiempo menor (la primera repuesta).
In [ ]:
from bson.code import Code
# La función map agrupará todas las respuestas, pero también necesita las
mapcode = Code("""
function () {
if (this.PostTypeId == 2)
emit(this.ParentId, {q: null, a: {Id: this.Id, CreationDate: this.CreationDate}, diff: null})
else if (this.PostTypeId == 1)
emit(this.Id, {q: {Id: this.Id, CreationDate: this.CreationDate}, a: null, diff: null})
}
""")
reducecode = Code("""
function (key, values) {
q = null // Pregunta
a = null // Respuesta con la fecha más cercana a la pregunta
values.forEach(function(v) {
if (v.q != null) // Pregunta
q = v.q
if (v.a != null) // Respuesta
{
if (a == null || v.a.CreationDate < a.CreationDate)
a = v.a
}
})
mindiff = null
if (q != null && a != null)
mindiff = a.CreationDate - q.CreationDate;
return {q: q, a: a, diff: mindiff}
}
""")
db.posts.map_reduce(mapcode, reducecode, "min_response_time")
In [ ]:
mrt = list(db.min_response_time.find())
In [ ]:
from pandas.io.json import json_normalize
df = json_normalize(mrt)
In [ ]:
df.index=df["_id"]
In [ ]:
df
In [ ]:
df['value.diff'].plot()
Esto sólo calcula el tiempo mínimo de cada pregunta a su respuesta. Después habría que aplicar lo visto en otros ejemplos para calcular la media. Con agregación, a continuación, sí que se puede calcular la media de forma relativament sencilla:
In [ ]:
min_answer_time = db.posts.aggregate([
{"$match" : {"PostTypeId" : 2}},
{
'$group' : {'_id' : '$ParentId',
# 'answers' : { '$push' : {'Id' : "$Id", 'CreationDate' : "$CreationDate"}},
'min' : {'$min' : "$CreationDate"}
}
},
{ "$lookup" : {
'from': "posts",
'localField': "_id",
'foreignField': "Id",
'as': "post"}
},
{ "$unwind" : "$post"},
{"$project" :
{"_id" : True,
"min" : True,
#"post" : True,
"diff" : {"$subtract" : ["$min", "$post.CreationDate"]}}
},
# { "$sort" : {'_id' : 1} }
{
"$group" : {
"_id" : None,
"avg" : { "$avg" : "$diff"}
}
}
])
min_answer_time = list(min_answer_time)
min_answer_time
In [ ]:
In [ ]:
In [ ]:
In [ ]: