Esta hoja muestra cómo acceder a bases de datos MongoDB y también a conectar la salida con Jupyter. Se puede utilizar el shell propio de MongoDB en el contenedor usando el programa mongo
. La diferencia es que ese programa espera código Javascript y aquí trabajaremos con Python.
In [ ]:
!pip install --upgrade pymongo
In [ ]:
from pprint import pprint as pp
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
%matplotlib inline
matplotlib.style.use('ggplot')
Usaremos la librería pymongo
para python. La cargamos a continuación.
In [ ]:
import pymongo
from pymongo import MongoClient
La conexión se inicia con MongoClient
en el host
descrito en el fichero docker-compose.yml
(mongo
).
In [ ]:
client = MongoClient("mongo",27017)
client
In [ ]:
client.list_database_names()
1
: AcceptedByOriginator2
: UpMod3
: DownMod4
: Offensive5
: Favorite - if VoteTypeId = 5 UserId will be populated6
: Close7
: Reopen8
: BountyStart9
: BountyClose10
: Deletion11
: Undeletion12
: Spam13
: InformModerator
In [ ]:
import csv
from datetime import datetime
def csv_to_mongo(file, coll):
"""
Carga un fichero CSV en Mongo. file especifica el fichero, coll la colección
dentro de la base de datos, y date_cols las columnas que serán interpretadas
como fechas.
"""
# Convertir todos los elementos que se puedan a números
def to_numeric(d):
try:
return int(d)
except ValueError:
try:
return float(d)
except ValueError:
return d
def to_date(d):
"""To ISO Date. If this cannot be converted, return NULL (None)"""
try:
return datetime.strptime(d, "%Y-%m-%dT%H:%M:%S.%f")
except ValueError:
return None
coll.drop()
with open(file, encoding='utf-8') as f:
# La llamada csv.reader() crea un iterador sobre un fichero CSV
reader = csv.reader(f, dialect='excel')
# Se leen las columnas. Sus nombres se usarán para crear las diferentes columnas en la familia
columns = next(reader)
# Las columnas que contienen 'Date' se interpretan como fechas
func_to_cols = list(map(lambda c: to_date if 'date' in c.lower() else to_numeric, columns))
docs=[]
for row in reader:
row = [func(e) for (func,e) in zip(func_to_cols, row)]
docs.append(dict(zip(columns, row)))
coll.insert_many(docs)
In [ ]:
import os
import os.path as path
from urllib.request import urlretrieve
def download_csv_upper_dir(baseurl, filename):
file = path.abspath(path.join(os.getcwd(),os.pardir,filename))
if not os.path.isfile(file):
urlretrieve(baseurl + '/' + filename, file)
In [ ]:
baseurl = 'http://neuromancer.inf.um.es:8080/es.stackoverflow/'
download_csv_upper_dir(baseurl, 'Posts.csv')
download_csv_upper_dir(baseurl, 'Users.csv')
download_csv_upper_dir(baseurl, 'Tags.csv')
download_csv_upper_dir(baseurl, 'Comments.csv')
download_csv_upper_dir(baseurl, 'Votes.csv')
Las bases de datos se crean conforme se nombran. Se puede utilizar la notación punto o la de diccionario. Las colecciones también.
In [ ]:
db = client.stackoverflow
db = client['stackoverflow']
db
Las bases de datos están compuestas por un conjunto de colecciones. Cada colección aglutina a un conjunto de objetos (documentos) del mismo tipo, aunque como vimos en teoría, cada documento puede tener un conjunto de atributos diferente.
In [ ]:
posts = db.posts
posts
Importación de los ficheros CSV. Por ahora creamos una colección diferente para cada uno. Después estudiaremos cómo poder optimizar el acceso usando agregación.
In [ ]:
csv_to_mongo('../Posts.csv',db.posts)
In [ ]:
csv_to_mongo('../Users.csv',db.users)
In [ ]:
csv_to_mongo('../Votes.csv',db.votes)
In [ ]:
csv_to_mongo('../Comments.csv',db.comments)
In [ ]:
csv_to_mongo('../Tags.csv',db.tags)
In [ ]:
posts.count_documents({})
El API de colección en Python se puede encontrar aquí: https://api.mongodb.com/python/current/api/pymongo/collection.html. La mayoría de libros y referencias muestran el uso de mongo desde Javascript, ya que el shell de MongoDB acepta ese lenguaje. La sintaxis con respecto a Python cambia un poco, y se puede seguir en el enlace anterior.
In [ ]:
post = posts.find_one()
post
Utilizo la librería pp
para imprimir los objetos grandes de una manera amigable.
In [ ]:
users = db.users
pp(users.find_one())
A cada objeto se le asigna una clave implícita con nombre "_id
" (si el objeto no lo incluye).
In [ ]:
print (type(post['_id']))
post['_id']
La siguiente sintaxis está descatalogada en las nuevas versiones, pero era más conveniente:
In [ ]:
#posts.save(post)
Ahora hay que hacerlo así (el resultado debe ser 1 documento modificado):
In [ ]:
result = posts.replace_one({"_id": post['_id']}, post)
result.modified_count
In [ ]:
post = posts.find_one()
pp(post)
In [ ]:
for k,v in post.items():
print("%s: %s" % (k,v))
Además de find_one()
, la función principal de búsqueda es find()
. Esta función ofrece un conjunto muy ámplio de opciones para búsqueda, que estudiaremos a continuación.
Primero, una consulta sencilla, con el valor de un campo:
In [ ]:
respuestas = posts.find({"PostTypeId": 2})
respuestas.count()
También existe explain()
, al estilo de SQL.
In [ ]:
posts.find({"PostTypeId": 2}).explain()
También se puede limitar la búsqueda.
In [ ]:
respuestas = posts.find({"PostTypeId": 2}).limit(10)
La respuesta no es un conjunto de elementos, sino un cursor que puede ir recorriéndose.
In [ ]:
respuestas
In [ ]:
list(respuestas)
También se puede importar en un dataframe de pandas:
In [ ]:
respuestas = posts.find({"PostTypeId": 2}).limit(30)
df = pd.DataFrame(respuestas)
df['Id'].plot()
La función find()
tiene un gran número de posibilidades para especificar la búsqueda. Se pueden utilizar cualificadores complejos como:
$and
$or
$not
Estos calificadores unen "objetos", no valores. Por otro lado, hay otros calificadores que se refieren a valores:
$lt
(menor)$lte
(menor o igual)$gt
(mayor)$gte
(mayor o igual)
In [ ]:
respuestas = posts.find({ '$and' : [ {"PostTypeId": 2} ,
{"Id" : {'$gte' : 100}} ]}).limit(10)
list(respuestas)
Mongodb incluye dos APIs para procesar y buscar documentos: el API de Map-Reduce y el API de agregación. Veremos primero el de Map-Reduce. Manual: https://docs.mongodb.com/manual/aggregation/#map-reduce
In [ ]:
from bson.code import Code
In [ ]:
map = Code(
'''
function () {
emit(this.OwnerUserId, 1);
}
''')
In [ ]:
reduce = Code(
'''
function (key, values)
{
return Array.sum(values);
}
''')
In [ ]:
results = db.posts.map_reduce(map, reduce, "myresults")
In [ ]:
db.list_collection_names()
In [ ]:
list(results.find())
In [ ]:
results = db.posts.map_reduce(map, reduce, "myresults", query={"Score": {'$gt' : 20}})
In [ ]:
list(results.find())
In [ ]:
db.users.find_one({'Id':20})
También hay operaciones específicas de la coleción, como count()
, groupby()
y distinct()
:
In [ ]:
db.posts.distinct('Score')
Framework de agregación: https://docs.mongodb.com/manual/reference/operator/aggregation/. Y aquí una presentación interesante sobre el tema: https://www.mongodb.com/presentations/aggregation-framework-0?jmp=docs&_ga=1.223708571.1466850754.1477658152
Proyección:
In [ ]:
respuestas = db['posts'].aggregate( [ {'$project' : { 'Id' : True }}, {'$limit': 20} ])
list(respuestas)
Lookup!
In [ ]:
respuestas = posts.aggregate( [
{'$match': { 'Score' : {'$gte': 40}}},
{'$lookup': {
'from': "users",
'localField': "OwnerUserId",
'foreignField': "Id",
'as': "owner"}
}
])
list(respuestas)
El $lookup
genera un array con todos los resultados. El operador $arrayElementAt
accede al primer elemento.
In [ ]:
respuestas = db.posts.aggregate( [
{'$match': { 'Score' : {'$gte': 40}}},
{'$lookup': {
'from': "users",
'localField': "OwnerUserId",
'foreignField': "Id",
'as': "owner"}
},
{ '$project' :
{
'Id' : True,
'Score' : True,
'username' : {'$arrayElemAt' : ['$owner.DisplayName', 0]},
'owner.DisplayName' : True
}}
])
list(respuestas)
$unwind
también puede usarse. "Desdobla" cada fila por cada elemento del array. En este caso, como sabemos que el array sólo contiene un elemento, sólo habrá una fila por fila original, pero sin el array. Finalmente se puede proyectar el campo que se quiera.
In [ ]:
respuestas = db.posts.aggregate( [
{'$match': { 'Score' : {'$gte': 40}}},
{'$lookup': {
'from': "users",
'localField': "OwnerUserId",
'foreignField': "Id",
'as': "owner"}
},
{ '$unwind': '$owner'},
{ '$project' :
{
'Id' : True,
'Score': True,
'username': '$owner.DisplayName'
}
}
])
list(respuestas)
Se pueden crear más índices, de tipos ASCENDING
, DESCENDING
, HASHED
, y otros geoespaciales. https://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.create_index
In [ ]:
posts.create_index([('Id', pymongo.HASHED)])
In [ ]:
In [ ]: