NoSQL (MongoDB) (sesión 4)

Esta hoja muestra cómo acceder a bases de datos MongoDB y también a conectar la salida con Jupyter. Se puede utilizar el shell propio de MongoDB en la máquina virtual usando el programa mongo. La diferencia es que ese programa espera código Javascript y aquí trabajaremos con Python.


In [ ]:
!pip install --upgrade pymongo

In [ ]:
from pprint import pprint as pp
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib

%matplotlib inline
matplotlib.style.use('ggplot')

Usaremos la librería pymongo para python. La cargamos a continuación.


In [ ]:
import pymongo
from pymongo import MongoClient

La conexión se inicia con MongoClient en el host descrito en el fichero docker-compose.yml (mongo).


In [ ]:
client = MongoClient("mongo",27017)
client

In [ ]:
client.list_database_names()
  • Format: 7zipped
  • Files:
    • badges.xml
      • UserId, e.g.: "420"
      • Name, e.g.: "Teacher"
      • Date, e.g.: "2008-09-15T08:55:03.923"
    • comments.xml
      • Id
      • PostId
      • Score
      • Text, e.g.: "@Stu Thompson: Seems possible to me - why not try it?"
      • CreationDate, e.g.:"2008-09-06T08:07:10.730"
      • UserId
    • posts.xml
      • Id
      • PostTypeId
        • 1: Question
        • 2: Answer
      • ParentID (only present if PostTypeId is 2)
      • AcceptedAnswerId (only present if PostTypeId is 1)
      • CreationDate
      • Score
      • ViewCount
      • Body
      • OwnerUserId
      • LastEditorUserId
      • LastEditorDisplayName="Jeff Atwood"
      • LastEditDate="2009-03-05T22:28:34.823"
      • LastActivityDate="2009-03-11T12:51:01.480"
      • CommunityOwnedDate="2009-03-11T12:51:01.480"
      • ClosedDate="2009-03-11T12:51:01.480"
      • Title=
      • Tags=
      • AnswerCount
      • CommentCount
      • FavoriteCount
    • posthistory.xml
      • Id
      • PostHistoryTypeId
        • 1: Initial Title - The first title a question is asked with.
        • 2: Initial Body - The first raw body text a post is submitted with.
        • 3: Initial Tags - The first tags a question is asked with.
        • 4: Edit Title - A question's title has been changed.
        • 5: Edit Body - A post's body has been changed, the raw text is stored here as markdown.
        • 6: Edit Tags - A question's tags have been changed.
        • 7: Rollback Title - A question's title has reverted to a previous version.
        • 8: Rollback Body - A post's body has reverted to a previous version - the raw text is stored here.
        • 9: Rollback Tags - A question's tags have reverted to a previous version.
        • 10: Post Closed - A post was voted to be closed.
        • 11: Post Reopened - A post was voted to be reopened.
        • 12: Post Deleted - A post was voted to be removed.
        • 13: Post Undeleted - A post was voted to be restored.
        • 14: Post Locked - A post was locked by a moderator.
        • 15: Post Unlocked - A post was unlocked by a moderator.
        • 16: Community Owned - A post has become community owned.
        • 17: Post Migrated - A post was migrated.
        • 18: Question Merged - A question has had another, deleted question merged into itself.
        • 19: Question Protected - A question was protected by a moderator
        • 20: Question Unprotected - A question was unprotected by a moderator
        • 21: Post Disassociated - An admin removes the OwnerUserId from a post.
        • 22: Question Unmerged - A previously merged question has had its answers and votes restored.
          • PostId
          • RevisionGUID: At times more than one type of history record can be recorded by a single action. All of these will be grouped using the same RevisionGUID
          • CreationDate: "2009-03-05T22:28:34.823"
          • UserId
          • UserDisplayName: populated if a user has been removed and no longer referenced by user Id
          • Comment: This field will contain the comment made by the user who edited a post
          • Text: A raw version of the new value for a given revision
        • If PostHistoryTypeId = 10, 11, 12, 13, 14, or 15 this column will contain a JSON encoded string with all users who have voted for the PostHistoryTypeId
        • If PostHistoryTypeId = 17 this column will contain migration details of either "from " or "to "
          • CloseReasonId
        • 1: Exact Duplicate - This question covers exactly the same ground as earlier questions on this topic; its answers may be merged with another identical question.
        • 2: off-topic
        • 3: subjective
        • 4: not a real question
        • 7: too localized
    • postlinks.xml
      • Id
      • CreationDate
      • PostId
      • RelatedPostId
      • PostLinkTypeId
        • 1: Linked
        • 3: Duplicate
    • users.xml
      • Id
      • Reputation
      • CreationDate
      • DisplayName
      • EmailHash
      • LastAccessDate
      • WebsiteUrl
      • Location
      • Age
      • AboutMe
      • Views
      • UpVotes
      • DownVotes
    • votes.xml
      • Id
      • PostId
      • VoteTypeId
        • 1: AcceptedByOriginator
        • 2: UpMod
        • 3: DownMod
        • 4: Offensive
        • 5: Favorite - if VoteTypeId = 5 UserId will be populated
        • 6: Close
        • 7: Reopen
        • 8: BountyStart
        • 9: BountyClose
        • 10: Deletion
        • 11: Undeletion
        • 12: Spam
        • 13: InformModerator
      • CreationDate
      • UserId (only for VoteTypeId 5)
      • BountyAmount (only for VoteTypeId 9)

Las bases de datos se crean conforme se nombran. Se puede utilizar la notación punto o la de diccionario. Las colecciones también.


In [ ]:
db = client.stackoverflow
db = client['stackoverflow']
db

Las bases de datos están compuestas por un conjunto de colecciones. Cada colección aglutina a un conjunto de objetos (documentos) del mismo tipo, aunque como vimos en teoría, cada documento puede tener un conjunto de atributos diferente.


In [ ]:
posts = db.posts
posts

Importación de los ficheros CSV. Por ahora creamos una colección diferente para cada uno. Después estudiaremos cómo poder optimizar el acceso usando agregación.


In [ ]:
import os
import os.path as path
from urllib.request import urlretrieve

def download_csv_upper_dir(baseurl, filename):
    file = path.abspath(path.join(os.getcwd(),os.pardir,filename))
    if not os.path.isfile(file):
        urlretrieve(baseurl + '/' + filename, file)

baseurl = 'http://neuromancer.inf.um.es:8080/es.stackoverflow/'
download_csv_upper_dir(baseurl, 'Posts.csv')
download_csv_upper_dir(baseurl, 'Users.csv')
download_csv_upper_dir(baseurl, 'Tags.csv')
download_csv_upper_dir(baseurl, 'Comments.csv')
download_csv_upper_dir(baseurl, 'Votes.csv')

In [ ]:
import csv
from datetime import datetime

def csv_to_mongo(file, coll):
    """
    Carga un fichero CSV en Mongo. file especifica el fichero, coll la colección
    dentro de la base de datos, y date_cols las columnas que serán interpretadas
    como fechas.
    """
    # Convertir todos los elementos que se puedan a números
    def to_numeric(d):
        try:
            return int(d)
        except ValueError:
            try:
                return float(d)
            except ValueError:
                return d
    
    def to_date(d):
        """To ISO Date. If this cannot be converted, return NULL (None)"""
        try:
            return datetime.strptime(d, "%Y-%m-%dT%H:%M:%S.%f")
        except ValueError:
            return None
    
    coll.drop()

    with open(file, encoding='utf-8') as f:
        # La llamada csv.reader() crea un iterador sobre un fichero CSV
        reader = csv.reader(f, dialect='excel')
        
        # Se leen las columnas. Sus nombres se usarán para crear las diferentes columnas en la familia
        columns = next(reader)
        
        # Las columnas que contienen 'Date' se interpretan como fechas
        func_to_cols = list(map(lambda c: to_date if 'date' in c.lower() else to_numeric, columns))
        
        docs=[]
        for row in reader:
            row = [func(e) for (func,e) in zip(func_to_cols, row)]
            docs.append(dict(zip(columns, row)))
        coll.insert_many(docs)

In [ ]:
csv_to_mongo('../Posts.csv',db.posts)

In [ ]:
csv_to_mongo('../Users.csv',db.users)

In [ ]:
csv_to_mongo('../Votes.csv',db.votes)

In [ ]:
csv_to_mongo('../Comments.csv',db.comments)

In [ ]:
csv_to_mongo('../Tags.csv',db.tags)

In [ ]:
posts.count_documents()

El API de colección en Python se puede encontrar aquí: https://api.mongodb.com/python/current/api/pymongo/collection.html. La mayoría de libros y referencias muestran el uso de mongo desde Javascript, ya que el shell de MongoDB acepta ese lenguaje. La sintaxis con respecto a Python cambia un poco, y se puede seguir en el enlace anterior.

Creación de índices

Para que el proceso map-reduce y de agregación funcione mejor, voy a crear índices sobre los atributos que se usarán como índice... Ojo, si no se crea las consultas pueden tardar mucho.


In [ ]:
(
    db.posts.create_index([('Id', pymongo.HASHED)]),
    db.comments.create_index([('Id', pymongo.HASHED)]),
    db.users.create_index([('Id', pymongo.HASHED)])
)

Map-Reduce

Mongodb incluye dos APIs para procesar y buscar documentos: el API de Map-Reduce y el API de agregación. Veremos primero el de Map-Reduce. Manual: https://docs.mongodb.com/manual/aggregation/#map-reduce


In [ ]:
from bson.code import Code

In [ ]:
map = Code(
'''
function () {
    emit(this.OwnerUserId, 1);
}
''')

In [ ]:
reduce = Code(
'''
function (key, values)
{
    return Array.sum(values);
}
''')

In [ ]:
results = posts.map_reduce(map, reduce, "posts_by_userid")

In [ ]:
posts_by_userid = db.posts_by_userid
list(posts_by_userid.find())

Se le puede añadir una etiqueta para especificar sobre qué elementos queremos trabajar (query):

La función map_reduce puede llevar añadida una serie de keywords, los mismos especificados en la documentación:

  • query: Restringe los datos que se tratan
  • sort: Ordena los documentos de entrada por alguna clave
  • limit: Limita el número de resultados
  • out: Especifica la colección de salida y otras opciones. Lo veremos después.
  • etc.

En el parámetro out se puede especificar en qué colección se quedarán los datos resultado del map-reduce. Por defecto, en la colección origen. (Todos los parámetros aquí: https://docs.mongodb.com/manual/reference/command/mapReduce/#mapreduce-out-cmd). En la operación map_reduce() podemos especificar la colección de salida, pero también podemos añadir un parámetro final out={...}.

Hay varias posibilidades para out:

  • replace: Sustituye la colección, si la hubiera, con la especificada (p. ej.: out={ "replace" : "coll" }.
  • merge: Mezcla la colección existente, sustituyendo los documentos que existan por los generados.
  • reduce: Si existe un documento con el mismo _id en la colección, se aplica la función reduce para fusionar ambos documentos y producir un nuevo documento.

Veremos a continuación, al resolver el ejercicio de crear post_comments con map-reduce cómo se utilizan estas posibilidades.

También hay operaciones específicas de la coleción, como count(), groupby() y distinct():


In [ ]:
db.posts.distinct('Score')

EJERCICIO (resuelto): Construir, con el API de Map-Reduce, una colección 'post_comments', donde se añade el campo 'Comments' a cada Post con la lista de todos los comentarios referidos a un Post.

Veremos la resolución de este ejercicio para que haga de ejemplo para los siguientes a implementar. En primer lugar, una operación map/reduce sólo se puede ejecutar sobre una colección, así que sólo puede contener resultados de la misma. Por lo tanto, con sólo una operación map/reduce no va a ser posible realizar todo el ejercicio.

Así, en primer lugar, parece interesante agrupar todos los comentarios que se han producido de un Post en particular. En cada comentario, el atributo PostId marca una referencia al Post al que se refiere.

Es importante cómo se construyen las operaciones map() y reduce(). Primero, la función map() se ejecutará para todos los documentos (o para todos los que cumplan la condición si se utiliza el modificador query=). Sin embargo, la función reduce() no se ejecutará a no ser que haya más de un elemento asociado a la misma clave.

Por lo tanto, la salida de la función map() debe ser la misma que la de la función reduce(). En nuestro caso, es un objeto JSON de la forma:

{ type: 'comment', comments: [ {comentario1, comentario2} ] }

En el caso de que sólo se ejecute la función map(), nótese cómo el objeto tiene la misma composición, pero con un array de sólo un elemento (comentario): sí mismo.


In [ ]:
from bson.code import Code

comments_map = Code('''
function () {
  emit(this.PostId, { type: 'comment', comments: [this]});
}
''')

comments_reduce = Code('''
function (key, values) {
    comments = [];
    values.forEach(function(v) {
        if ('comments' in v)
            comments = comments.concat(v.comments)
    })
    return { type: 'comment', comments: comments };
}
''')

db.comments.map_reduce(comments_map, comments_reduce, "post_comments")

In [ ]:
list(db.post_comments.find()[:10])

Esto demuestra que en general el esquema de datos en MongoDB no estaría así desde el principio.

Después del primer paso de map/reduce, tenemos que construir la colección final que asocia cada Post con sus comentarios. Como hemos construido antes la colección post_comments indizada por el Id del Post, podemos utilizar ahora una ejecución de map/reduce que mezcle los datos en post_comments con los datos en posts.

La segunda ejecución de map/reduce la haremos sobre posts, para que el resultado sea completo, incluso para los Posts que no aparecen en comentarios, y por lo tanto tendrán el atributo comments vacío.

En este caso, debemos hacer que la función map() produzca una salida de documentos que también están indizados con el atributo Id, y, como sólo hay uno para cada Id, la función reduce() no se ejecutará. Tan sólo se ejecutará para mezclar ambas colecciones, así que la función reduce() tendrá que estar preparada para mezclar objetos de tipo "comment" y Posts. En cualquier caso, como se puede ver, es válida también aunque sólo se llame con un objeto de tipo Post. Finalmente, la función map() prepara a cada objeto Post, inicialmente, con una lista de comentarios vacíos


In [ ]:
posts_map = Code("""
function () {
  this.comments = [];
  emit(this.Id, this);
}
""")

posts_reduce = Code("""
function (key, values) {
  comments = []; // The set of comments
  obj = {}; // The object to return
  
  values.forEach(function(v) {
    if (v['type'] === 'comment')
      comments = comments.concat(v.comments);
    else // Object
    {
      obj = v;
      // obj.comments will always be there because of the map() operation
      comments = comments.concat(obj.comments);
    }
  })
  
  // Finalize: Add the comments to the object to return
  obj.comments = comments;

  return obj;
}
""")

db.posts.map_reduce(posts_map, posts_reduce, out={'reduce' : 'post_comments'})

In [ ]:
list(db.post_comments.find()[:10])

Proyección:


In [ ]:
respuestas = db['posts'].aggregate( [ {'$project' : { 'Id' : True }}, {'$limit': 20} ])
list(respuestas)

Lookup!


In [ ]:
respuestas = posts.aggregate( [
        {'$match': { 'Score' : {'$gte': 40}}},
        {'$lookup': {
            'from': "users", 
            'localField': "OwnerUserId",
            'foreignField': "Id",
            'as': "owner"}
        }
        ])
list(respuestas)

El $lookup genera un array con todos los resultados. El operador $arrayElementAt accede al primer elemento.


In [ ]:
respuestas = db.posts.aggregate( [
        {'$match': { 'Score' : {'$gte': 40}}},
        {'$lookup': {
            'from': "users", 
            'localField': "OwnerUserId",
            'foreignField': "Id",
            'as': "owner"}
        },
        { '$project' :
        {
            'Id' : True,
            'Score' : True,
            'username' : {'$arrayElemAt' : ['$owner.DisplayName', 0]},
            'owner.DisplayName'  : True
        }}
        ])
list(respuestas)

$unwind también puede usarse. "Desdobla" cada fila por cada elemento del array. En este caso, como sabemos que el array sólo contiene un elemento, sólo habrá una fila por fila original, pero sin el array. Finalmente se puede proyectar el campo que se quiera.


In [ ]:
respuestas = db.posts.aggregate( [
        {'$match': { 'Score' : {'$gte': 40}}},
        {'$lookup': {
            'from': "users", 
            'localField': "OwnerUserId",
            'foreignField': "Id",
            'as': "owner"}
        },
        { '$unwind': '$owner'},
        { '$project' : 
         {
             'username': '$owner.DisplayName'
         }
        }
        ])
list(respuestas)

Ejemplo de realización de la consulta RQ4

Como ejemplo de consulta compleja con el Framework de Agregación, adjunto una posible solución a la consulta RQ4:


In [ ]:
RQ4 = db.posts.aggregate( [
        { "$match" : {"PostTypeId": 2}},
        {'$lookup': {
            'from': "posts", 
            'localField': "ParentId",
            'foreignField': "Id",
            'as': "question"
            }
        },
        {
            '$unwind' : '$question'
        },
        {
            '$project' : { 'OwnerUserId': True, 
                           'OP' : '$question.OwnerUserId'
                         }
        },
        {
            '$group' : {'_id' : {'min' : { '$min' : ['$OwnerUserId' , '$OP'] },
                                 'max' : { '$max' : ['$OwnerUserId' , '$OP'] }},
                        'pairs' : {'$addToSet' : { '0q': '$OP', '1a': '$OwnerUserId'}}
                        }
        },
        {
            '$project': {
                'pairs' : True,
                'npairs' : { '$size' : '$pairs'}
            }
        },
        {
            '$match' : { 'npairs' : { '$eq' : 2}}
        }
    ])
RQ4 = list(RQ4)
RQ4

La explicación es como sigue:

  1. Se eligen sólo las respuestas
  2. Se accede a la tabla posts para recuperar los datos de la pregunta
  3. A continuación se proyectan sólo el usuario que pregunta y el que hace la respuesta
  4. El paso más imaginativo es el de agrupación. Lo que se intenta es que ambos pares de usuarios que están relacionados como preguntante -> respondiente y viceversa, caigan en la misma clave. Por ello, se coge el máximo y el mínimo de ambos identificadores de usuarios y se construye una clave con ambos números en las mismas posiciones. Así, ambas combinaciones de usuario que pregunta y que responde caerán en la misma clave. También se usa un conjunto (en pairs), y sólo se añadirá una vez las posibles combinaciones iguales de preguntador/respondiente.
  5. Sólo nos interesan aquellas tuplas cuyo tamaño del conjunto de pares de pregunta/respuesta sea igual a dos (en un elemento uno de los dos usuarios habrá preguntado y el otro habrá respondido y en el otro viceversa).

La implementación en Map-Reduce se puede realizar con la misma idea.

En el caso de que queramos tener como referencia las preguntas y respuestas a las que se refiere la conversación, se puede añadir un campo más que guarde todas las preguntas junto con sus respuestas consideradas


In [ ]:
RQ4 = db.posts.aggregate( [
        {'$match': { 'PostTypeId' : 2}},
        {'$lookup': {
            'from': "posts", 
            'localField': "ParentId",
            'foreignField': "Id",
            'as': "question"}
        },
        {
            '$unwind' : '$question'
        },
        {
            '$project' : {'OwnerUserId': True,
                          'QId' : '$question.Id',
                          'AId' : '$Id',
                          'OP' : '$question.OwnerUserId'
                         }
        },
        {
            '$group' : {'_id' : {'min' : { '$min' : ['$OwnerUserId' , '$OP'] },
                                 'max' : { '$max' : ['$OwnerUserId' , '$OP'] }},
                        'pairs' : {'$addToSet' : { '0q':'$OP', '1a': '$OwnerUserId'}},
                        'considered_pairs' : { '$push' : {'QId' : '$QId', 'AId' : '$AId'}}
                        }
        },
        {
            '$project': {
                'pairs' : True,
                'npairs' : { '$size' : '$pairs'},
                'considered_pairs' : True
            }
        },
        {
            '$match' : { 'npairs' : { '$eq' : 2}}
        }
    ])
RQ4 = list(RQ4)
RQ4

In [ ]:
(db.posts.find_one({'Id': 238}), db.posts.find_one({'Id': 243}),
db.posts.find_one({'Id': 222}), db.posts.find_one({'Id': 223}))

Ejemplo de consulta: Tiempo medio desde que se hace una pregunta hasta que se le da la primera respuesta

Veamos cómo calcular el tiempo medio desde que se hace una pregunta hasta que se le da la primera respuesta. En este caso se puede utilizar las respuestas para apuntar a qué pregunta correspondieron. No se considerarán pues las preguntas que no tienen respuesta, lo cual es razonable. Sin embargo, la función map debe guardar también las preguntas para poder calcular el tiempo menor (la primera repuesta).


In [ ]:
from bson.code import Code

# La función map agrupará todas las respuestas, pero también necesita las 
mapcode = Code("""
function () {
    if (this.PostTypeId == 2)
        emit(this.ParentId, {q: null, a: {Id: this.Id, CreationDate: this.CreationDate}, diff: null})
    else if (this.PostTypeId == 1)
        emit(this.Id, {q: {Id: this.Id, CreationDate: this.CreationDate}, a: null, diff: null})
}
""")

reducecode = Code("""
function (key, values) {
    q = null // Pregunta
    a = null // Respuesta con la fecha más cercana a la pregunta
  
    values.forEach(function(v) {
        if (v.q != null) // Pregunta
            q = v.q
        if (v.a != null) // Respuesta
        {
            if (a == null || v.a.CreationDate < a.CreationDate)
                a = v.a
        }
    })

    mindiff = null
    if (q != null && a != null)
        mindiff = a.CreationDate - q.CreationDate;

    return {q: q, a: a, diff: mindiff}
}
""")

db.posts.map_reduce(mapcode, reducecode, "min_response_time")

In [ ]:
mrt = list(db.min_response_time.find())

In [ ]:
from pandas.io.json import json_normalize

df = json_normalize(mrt)

In [ ]:
df.index=df["_id"]

In [ ]:
df

In [ ]:
df['value.diff'].plot()

Esto sólo calcula el tiempo mínimo de cada pregunta a su respuesta. Después habría que aplicar lo visto en otros ejemplos para calcular la media. Con agregación, a continuación, sí que se puede calcular la media de forma relativament sencilla:


In [ ]:
min_answer_time = db.posts.aggregate([
    {"$match" : {"PostTypeId" : 2}},
    {
        '$group' : {'_id' : '$ParentId',
                   # 'answers' : { '$push' : {'Id' : "$Id", 'CreationDate' : "$CreationDate"}},
                    'min' : {'$min' : "$CreationDate"}
                    }
    },
    { "$lookup" : {
        'from': "posts", 
        'localField': "_id",
        'foreignField': "Id",
        'as': "post"}
    },
    { "$unwind" : "$post"},
    {"$project" :
        {"_id" : True,
        "min" : True,
        #"post" : True,
        "diff" : {"$subtract" : ["$min", "$post.CreationDate"]}}
    },
    #    { "$sort" : {'_id' : 1} }
    {
        "$group" :  {
            "_id" : None,
            "avg" : { "$avg" : "$diff"}
        }
    }
])
min_answer_time = list(min_answer_time)
min_answer_time

EJERCICIO: Con Map-Reduce, construir las colecciones que asocian un usuario con sus tags y los tags con los usuarios que las utilizan (E1).


In [ ]:

EJERCICIO: Con el Framework de Agregación, generar la colección StackOverflowFacts vista en la sesión 2 (E2).


In [ ]:

EJERCICIO: Con Map-Reduce, implementar la consulta RQ3 de la sesión 2.


In [ ]:

EJERCICIO (difícil, opcional): Con Agregación, calcular, enla tabla StackOverflowFacts la media de tiempo que pasa desde que los usuarios se registran hasta que publican su primera pregunta.


In [ ]: