NoSQL (MongoDB) (sesión 3)

Esta hoja muestra cómo acceder a bases de datos MongoDB y también a conectar la salida con Jupyter. Se puede utilizar el shell propio de MongoDB en el contenedor usando el programa mongo. La diferencia es que ese programa espera código Javascript y aquí trabajaremos con Python.


In [ ]:
!pip install --upgrade pymongo

In [ ]:
from pprint import pprint as pp
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib

%matplotlib inline
matplotlib.style.use('ggplot')

Usaremos la librería pymongo para python. La cargamos a continuación.


In [ ]:
import pymongo
from pymongo import MongoClient

La conexión se inicia con MongoClient en el host descrito en el fichero docker-compose.yml (mongo).


In [ ]:
client = MongoClient("mongo",27017)
client

In [ ]:
client.list_database_names()
  • Format: 7zipped
  • Files:
    • badges.xml
      • UserId, e.g.: "420"
      • Name, e.g.: "Teacher"
      • Date, e.g.: "2008-09-15T08:55:03.923"
    • comments.xml
      • Id
      • PostId
      • Score
      • Text, e.g.: "@Stu Thompson: Seems possible to me - why not try it?"
      • CreationDate, e.g.:"2008-09-06T08:07:10.730"
      • UserId
    • posts.xml
      • Id
      • PostTypeId
        • 1: Question
        • 2: Answer
      • ParentID (only present if PostTypeId is 2)
      • AcceptedAnswerId (only present if PostTypeId is 1)
      • CreationDate
      • Score
      • ViewCount
      • Body
      • OwnerUserId
      • LastEditorUserId
      • LastEditorDisplayName="Jeff Atwood"
      • LastEditDate="2009-03-05T22:28:34.823"
      • LastActivityDate="2009-03-11T12:51:01.480"
      • CommunityOwnedDate="2009-03-11T12:51:01.480"
      • ClosedDate="2009-03-11T12:51:01.480"
      • Title=
      • Tags=
      • AnswerCount
      • CommentCount
      • FavoriteCount
    • posthistory.xml
      • Id
      • PostHistoryTypeId
        • 1: Initial Title - The first title a question is asked with.
        • 2: Initial Body - The first raw body text a post is submitted with.
        • 3: Initial Tags - The first tags a question is asked with.
        • 4: Edit Title - A question's title has been changed.
        • 5: Edit Body - A post's body has been changed, the raw text is stored here as markdown.
        • 6: Edit Tags - A question's tags have been changed.
        • 7: Rollback Title - A question's title has reverted to a previous version.
        • 8: Rollback Body - A post's body has reverted to a previous version - the raw text is stored here.
        • 9: Rollback Tags - A question's tags have reverted to a previous version.
        • 10: Post Closed - A post was voted to be closed.
        • 11: Post Reopened - A post was voted to be reopened.
        • 12: Post Deleted - A post was voted to be removed.
        • 13: Post Undeleted - A post was voted to be restored.
        • 14: Post Locked - A post was locked by a moderator.
        • 15: Post Unlocked - A post was unlocked by a moderator.
        • 16: Community Owned - A post has become community owned.
        • 17: Post Migrated - A post was migrated.
        • 18: Question Merged - A question has had another, deleted question merged into itself.
        • 19: Question Protected - A question was protected by a moderator
        • 20: Question Unprotected - A question was unprotected by a moderator
        • 21: Post Disassociated - An admin removes the OwnerUserId from a post.
        • 22: Question Unmerged - A previously merged question has had its answers and votes restored.
          • PostId
          • RevisionGUID: At times more than one type of history record can be recorded by a single action. All of these will be grouped using the same RevisionGUID
          • CreationDate: "2009-03-05T22:28:34.823"
          • UserId
          • UserDisplayName: populated if a user has been removed and no longer referenced by user Id
          • Comment: This field will contain the comment made by the user who edited a post
          • Text: A raw version of the new value for a given revision
        • If PostHistoryTypeId = 10, 11, 12, 13, 14, or 15 this column will contain a JSON encoded string with all users who have voted for the PostHistoryTypeId
        • If PostHistoryTypeId = 17 this column will contain migration details of either "from " or "to "
          • CloseReasonId
        • 1: Exact Duplicate - This question covers exactly the same ground as earlier questions on this topic; its answers may be merged with another identical question.
        • 2: off-topic
        • 3: subjective
        • 4: not a real question
        • 7: too localized
    • postlinks.xml
      • Id
      • CreationDate
      • PostId
      • RelatedPostId
      • PostLinkTypeId
        • 1: Linked
        • 3: Duplicate
    • users.xml
      • Id
      • Reputation
      • CreationDate
      • DisplayName
      • EmailHash
      • LastAccessDate
      • WebsiteUrl
      • Location
      • Age
      • AboutMe
      • Views
      • UpVotes
      • DownVotes
    • votes.xml
      • Id
      • PostId
      • VoteTypeId
        • 1: AcceptedByOriginator
        • 2: UpMod
        • 3: DownMod
        • 4: Offensive
        • 5: Favorite - if VoteTypeId = 5 UserId will be populated
        • 6: Close
        • 7: Reopen
        • 8: BountyStart
        • 9: BountyClose
        • 10: Deletion
        • 11: Undeletion
        • 12: Spam
        • 13: InformModerator
      • CreationDate
      • UserId (only for VoteTypeId 5)
      • BountyAmount (only for VoteTypeId 9)

In [ ]:
import csv
from datetime import datetime

def csv_to_mongo(file, coll):
    """
    Carga un fichero CSV en Mongo. file especifica el fichero, coll la colección
    dentro de la base de datos, y date_cols las columnas que serán interpretadas
    como fechas.
    """
    # Convertir todos los elementos que se puedan a números
    def to_numeric(d):
        try:
            return int(d)
        except ValueError:
            try:
                return float(d)
            except ValueError:
                return d
    
    def to_date(d):
        """To ISO Date. If this cannot be converted, return NULL (None)"""
        try:
            return datetime.strptime(d, "%Y-%m-%dT%H:%M:%S.%f")
        except ValueError:
            return None
    
    coll.drop()

    with open(file, encoding='utf-8') as f:
        # La llamada csv.reader() crea un iterador sobre un fichero CSV
        reader = csv.reader(f, dialect='excel')
        
        # Se leen las columnas. Sus nombres se usarán para crear las diferentes columnas en la familia
        columns = next(reader)
        
        # Las columnas que contienen 'Date' se interpretan como fechas
        func_to_cols = list(map(lambda c: to_date if 'date' in c.lower() else to_numeric, columns))
        
        docs=[]
        for row in reader:
            row = [func(e) for (func,e) in zip(func_to_cols, row)]
            docs.append(dict(zip(columns, row)))
        coll.insert_many(docs)

In [ ]:
import os
import os.path as path
from urllib.request import urlretrieve

def download_csv_upper_dir(baseurl, filename):
    file = path.abspath(path.join(os.getcwd(),os.pardir,filename))
    if not os.path.isfile(file):
        urlretrieve(baseurl + '/' + filename, file)

In [ ]:
baseurl = 'http://neuromancer.inf.um.es:8080/es.stackoverflow/'
download_csv_upper_dir(baseurl, 'Posts.csv')
download_csv_upper_dir(baseurl, 'Users.csv')
download_csv_upper_dir(baseurl, 'Tags.csv')
download_csv_upper_dir(baseurl, 'Comments.csv')
download_csv_upper_dir(baseurl, 'Votes.csv')

Las bases de datos se crean conforme se nombran. Se puede utilizar la notación punto o la de diccionario. Las colecciones también.


In [ ]:
db = client.stackoverflow
db = client['stackoverflow']
db

Las bases de datos están compuestas por un conjunto de colecciones. Cada colección aglutina a un conjunto de objetos (documentos) del mismo tipo, aunque como vimos en teoría, cada documento puede tener un conjunto de atributos diferente.


In [ ]:
posts = db.posts
posts

Importación de los ficheros CSV. Por ahora creamos una colección diferente para cada uno. Después estudiaremos cómo poder optimizar el acceso usando agregación.


In [ ]:
csv_to_mongo('../Posts.csv',db.posts)

In [ ]:
csv_to_mongo('../Users.csv',db.users)

In [ ]:
csv_to_mongo('../Votes.csv',db.votes)

In [ ]:
csv_to_mongo('../Comments.csv',db.comments)

In [ ]:
csv_to_mongo('../Tags.csv',db.tags)

In [ ]:
posts.count_documents({})

El API de colección de MongoDB

El API de colección en Python se puede encontrar aquí: https://api.mongodb.com/python/current/api/pymongo/collection.html. La mayoría de libros y referencias muestran el uso de mongo desde Javascript, ya que el shell de MongoDB acepta ese lenguaje. La sintaxis con respecto a Python cambia un poco, y se puede seguir en el enlace anterior.


In [ ]:
post = posts.find_one()
post

Utilizo la librería pp para imprimir los objetos grandes de una manera amigable.


In [ ]:
users = db.users
pp(users.find_one())

A cada objeto se le asigna una clave implícita con nombre "_id" (si el objeto no lo incluye).


In [ ]:
print (type(post['_id']))
post['_id']

La siguiente sintaxis está descatalogada en las nuevas versiones, pero era más conveniente:


In [ ]:
#posts.save(post)

Ahora hay que hacerlo así (el resultado debe ser 1 documento modificado):


In [ ]:
result = posts.replace_one({"_id": post['_id']}, post)
result.modified_count

In [ ]:
post = posts.find_one()
pp(post)

In [ ]:
for k,v in post.items():
    print("%s: %s" % (k,v))

Además de find_one(), la función principal de búsqueda es find(). Esta función ofrece un conjunto muy ámplio de opciones para búsqueda, que estudiaremos a continuación.

Primero, una consulta sencilla, con el valor de un campo:


In [ ]:
respuestas = posts.find({"PostTypeId": 2})
respuestas.count()

También existe explain(), al estilo de SQL.


In [ ]:
posts.find({"PostTypeId": 2}).explain()

También se puede limitar la búsqueda.


In [ ]:
respuestas = posts.find({"PostTypeId": 2}).limit(10)

La respuesta no es un conjunto de elementos, sino un cursor que puede ir recorriéndose.


In [ ]:
respuestas

In [ ]:
list(respuestas)

También se puede importar en un dataframe de pandas:


In [ ]:
respuestas = posts.find({"PostTypeId": 2}).limit(30)
df = pd.DataFrame(respuestas)
df['Id'].plot()

La función find() tiene un gran número de posibilidades para especificar la búsqueda. Se pueden utilizar cualificadores complejos como:

  • $and
  • $or
  • $not

Estos calificadores unen "objetos", no valores. Por otro lado, hay otros calificadores que se refieren a valores:

  • $lt (menor)
  • $lte (menor o igual)
  • $gt (mayor)
  • $gte (mayor o igual)

In [ ]:
respuestas = posts.find({ '$and' : [ {"PostTypeId": 2} ,
                                    {"Id" : {'$gte' : 100}} ]}).limit(10)
list(respuestas)

Map-Reduce

Mongodb incluye dos APIs para procesar y buscar documentos: el API de Map-Reduce y el API de agregación. Veremos primero el de Map-Reduce. Manual: https://docs.mongodb.com/manual/aggregation/#map-reduce


In [ ]:
from bson.code import Code

In [ ]:
map = Code(
'''
function () {
    emit(this.OwnerUserId, 1);
}
''')

In [ ]:
reduce = Code(
'''
function (key, values)
{
    return Array.sum(values);
}
''')

In [ ]:
results = db.posts.map_reduce(map, reduce, "myresults")

In [ ]:
db.list_collection_names()

In [ ]:
list(results.find())

In [ ]:
results = db.posts.map_reduce(map, reduce, "myresults", query={"Score": {'$gt' : 20}})

In [ ]:
list(results.find())

In [ ]:
db.users.find_one({'Id':20})

También hay operaciones específicas de la coleción, como count(), groupby() y distinct():


In [ ]:
db.posts.distinct('Score')

Proyección:


In [ ]:
respuestas = db['posts'].aggregate( [ {'$project' : { 'Id' : True }}, {'$limit': 20} ])
list(respuestas)

Lookup!


In [ ]:
respuestas = posts.aggregate( [
        {'$match': { 'Score' : {'$gte': 40}}},
        {'$lookup': {
            'from': "users", 
            'localField': "OwnerUserId",
            'foreignField': "Id",
            'as': "owner"}
        }
        ])
list(respuestas)

El $lookup genera un array con todos los resultados. El operador $arrayElementAt accede al primer elemento.


In [ ]:
respuestas = db.posts.aggregate( [
        {'$match': { 'Score' : {'$gte': 40}}},
        {'$lookup': {
            'from': "users", 
            'localField': "OwnerUserId",
            'foreignField': "Id",
            'as': "owner"}
        },
        { '$project' :
        {
            'Id' : True,
            'Score' : True,
            'username' : {'$arrayElemAt' : ['$owner.DisplayName', 0]},
            'owner.DisplayName'  : True
        }}
        ])
list(respuestas)

$unwind también puede usarse. "Desdobla" cada fila por cada elemento del array. En este caso, como sabemos que el array sólo contiene un elemento, sólo habrá una fila por fila original, pero sin el array. Finalmente se puede proyectar el campo que se quiera.


In [ ]:
respuestas = db.posts.aggregate( [
        {'$match': { 'Score' : {'$gte': 40}}},
        {'$lookup': {
            'from': "users", 
            'localField': "OwnerUserId",
            'foreignField': "Id",
            'as': "owner"}
        },
        { '$unwind': '$owner'},
        { '$project' : 
         {
             'Id' : True,
             'Score': True,
             'username': '$owner.DisplayName'
         }
        }
        ])
list(respuestas)

Se pueden crear más índices, de tipos ASCENDING, DESCENDING, HASHED, y otros geoespaciales. https://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.create_index


In [ ]:
posts.create_index([('Id', pymongo.HASHED)])

EJERCICIO: Separar en dos colecciones las preguntas de las respuestas


In [ ]:

EJERCICIO: Con Map-Reduce y Agregación, mostrar las consulta RQ1 de la sesión 2


In [ ]: