NoSQL (HBase) (sesión 5)

Esta hoja muestra cómo acceder a bases de datos HBase y también a conectar la salida con Jupyter.

Se puede utilizar el shell propio de HBase en el contenedor.

Con HBase vamos a simular un clúster de varias máquinas con varios contenedores conectados. En el directorio hbase del repositorio git hay un script para ejecutar la instalación con docker-compose.

Para conectarse al clúster con un shell de hbase, hay que ejecutar, desde una terminal el siguiente comando de docker:

$ docker exec -ti hbase-regionserver hbase shell
Base Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.2.7, rac57c51f7ad25e312b4275665d62b34a5945422f, Fri Sep  7 16:11:05 CDT 2018

hbase(main):001:0>

In [ ]:
from pprint import pprint as pp
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib

%matplotlib inline
matplotlib.style.use('ggplot')

Usaremos la librería happybase para python. La cargamos a continuación y hacemos la conexión.


In [ ]:
import os
import os.path as path
from urllib.request import urlretrieve

def download_file_upper_dir(baseurl, filename):
    file = path.abspath(path.join(os.getcwd(),os.pardir,filename))
    if not os.path.isfile(file):
        urlretrieve(baseurl + '/' + filename, file)

baseurl = 'http://neuromancer.inf.um.es:8080/es.stackoverflow/'
download_file_upper_dir(baseurl, 'Posts.csv')
download_file_upper_dir(baseurl, 'Users.csv')
download_file_upper_dir(baseurl, 'Tags.csv')
download_file_upper_dir(baseurl, 'Comments.csv')
download_file_upper_dir(baseurl, 'Votes.csv')

In [ ]:
!pip install happybase

In [ ]:
import happybase

host = 'hbase-thriftserver'
pool = happybase.ConnectionPool(size=5, host=host)
with pool.connection() as connection:
    print(connection.tables())

Para la carga inicial, vamos a crear todas las tablas con una única familia de columnas, rawdata, donde meteremos toda la información raw comprimida. Después podremos hacer reorganizaciones de los datos para hacer el acceso más eficiente. Es una de las muchas ventajas de no tener un esquema.


In [ ]:
# Create tables
tables = ['posts', 'votes', 'users', 'tags', 'comments']
for t in tables:
    try:
        with pool.connection() as connection:
            connection.create_table(
                t,
                {
                    'rawdata': dict(max_versions=1,compression='GZ')
                })
    except Exception as e:
        print("Database already exists: {0}. {1}".format(t, e))
        pass
with pool.connection() as connection:
    print(connection.tables())

El código de importación es siempre el mismo, ya que se coge la primera fila del CSV que contiene el nombre de las columnas y se utiliza para generar nombres de columnas dentro de la familia de columnas dada como parámetro. La función csv_to_hbase() acepta un fichero CSV a abrir, un nombre de tabla y una familia de columnas donde agregar las columnas del fichero CSV. En nuestro caso siempre va a ser rawdata.


In [ ]:
import csv

def csv_to_hbase(file, tablename, cf):
    with pool.connection() as connection, open(file) as f:
        table = connection.table(tablename)

        # La llamada csv.reader() crea un iterador sobre un fichero CSV
        reader = csv.reader(f, dialect='excel')
        
        # Se leen las columnas. Sus nombres se usarán para crear las diferentes columnas en la familia
        columns = next(reader)
        columns = [cf + ':' + c for c in columns]
        
        with table.batch(batch_size=500) as b:
            for row in reader:
                # La primera columna se usará como Row Key
                b.put(row[0], dict(zip(columns[1:], row[1:])))

In [ ]:
for t in tables:
    print("Importando tabla {0}...".format(t))
    %time csv_to_hbase('../'+t.capitalize() + '.csv', t, 'rawdata')

Consultas sencillas desde Python

A continuación veremos algunas consultas sencillas desde python usando el API de happybase.


In [ ]:
with pool.connection() as connection:
    posts = connection.table('posts')

Obtener el Post con Id 5. La orden más sencilla e inmediata de HBase es obtener una fila, opcionalmente limitando las columnas a mostrar:


In [ ]:
posts.row(b'5',columns=[b'rawdata:Body'])

El siguiente código permite mostrar de forma amigable las tablas extraídas de la base de datos en forma de diccionario:


In [ ]:
# http://stackoverflow.com/a/30525061/62365
class DictTable(dict):
    # Overridden dict class which takes a dict in the form {'a': 2, 'b': 3},
    # and renders an HTML Table in IPython Notebook.
    def _repr_html_(self):
        htmltext = ["<table width=100%>"]
        for key, value in self.items():
            htmltext.append("<tr>")
            htmltext.append("<td>{0}</td>".format(key.decode('utf-8')))
            htmltext.append("<td>{0}</td>".format(value.decode('utf-8')))
            htmltext.append("</tr>")
        htmltext.append("</table>")
        return ''.join(htmltext)

In [ ]:
# Muestra cómo queda la fila del Id del Post 9997
DictTable(posts.row(b'5'))

In [ ]:
DictTable(posts.row(b'5',columns=[b'rawdata:AnswerCount',b'rawdata:AcceptedAnswerId']))

Y también se puede recorrer como un diccionario normal (el decode se utiliza para convertir los valores binarios de la base de datos a una codificación UTF-8):


In [ ]:
row = posts.row(b'5')
for key, value in row.items():
    print("Key = '%s', Value = '%s'" % (key, value.decode('utf-8')[:40]))

Finalmente, también se puede recorrer toda la tabla estableciendo filtros, que se estudiarán después. Se utiliza la función scan. Se puede iterar con los parámetros key y data. Por ejemplo, calcular el tamaño máximo de la longitud del texto de los posts:

(OJO, es un ejemplo, no se debería hacer así)


In [ ]:
max_len = 0
for key, data in posts.scan():
    cur_len = len(data[b'rawdata:Body'].decode('utf-8'))
    if cur_len > max_len:
        max_len = cur_len

print("Máxima longitud: %s caracteres." % (max_len))

Construcción de estructuras anidadas

Al igual que pasaba con MongoDB, las bases de datos NoSQL como en este caso HBase permiten almacenar estructuras de datos complejas. En nuestro caso vamos a agregar los comentarios de cada pregunta o respuesta (post) en columnas del mismo. Para ello, creamos una nueva familia de columnas comments.

HBase es bueno para añadir columnas sencillas, por ejemplo que contengan un valor. Sin embargo, si queremos añadir objetos complejos, tenemos que jugar con la codificación de la familia de columnas y columna.

Usaremos el shell porque happybase no permite alterar tablas ya creadas. Para acceder al shell de HBase, tenemos que contactar al contenedor hbase-regionserver, de esta forma:

$ docker exec -ti hbase-regionserver hbase shell

En el shell de HBase pondremos lo siguiente:

disable 'posts'
alter 'posts', {NAME => 'comments', VERSIONS => 1}
enable 'posts'

Cada comentario que añadimos contiene, al menos:

  • un id único
  • un texto
  • un autor
  • etc.

¿Cómo se consigue meterlo en una única familia de columnas?

Hay varias formas. La que usaremos aquí, añadiremos el id de cada comentario como parte del nombre de la columna. Por ejemplo, el comentario con Id 2000, generará las columnas:

  • Id_2000 (valor 2000)
  • UserId_2000
  • PostId_2000
  • Text_2000

con sus correspondientes valores. Así, todos los datos relativos al comentario con Id original 2000, estarán almacenados en todas las columnas que terminen en "_2000". La base de datos permite implementar filtros que nos permiten buscar esto de forma muy sencilla. Los veremos después.


In [ ]:
with pool.connection() as connection:
    comments = connection.table('comments')
    posts = connection.table('posts')

    with posts.batch(batch_size=500) as bp:
        # Hacer un scan de la tabla
        for key, data in comments.scan():
            comment = {'comments:' +
                       d.decode('utf-8').split(':')[1] + "_" +
                       key.decode('utf-8') : 
                       data[d].decode('utf-8') for d in data.keys()}
            bp.put(data[b'rawdata:PostId'], comment)

In [ ]:
DictTable(posts.row(b'7251'))

In [ ]:
%timeit q = posts.row(b'7251')

In [ ]:
from functools import reduce

def doit():
    q = posts.row(b'7251')
    (s,n) = reduce(lambda res, e:
                    (res[0]+len(e[1].decode('utf-8')), res[1]+1) if e[0].decode('utf-8').startswith('comments:Text') else res
                   , q.items(), (0,0))
    return (s/n)

%timeit doit()
# MySQL -> 1.12 ms
# HBase -> 1.47 ms

EJERCICIO: ¿Cómo sería el código para saber qué usuarios han comentado un post en particular?

Wikipedia

Como otro ejemplo de carga de datos y de organización en HBase, veremos de manera simplificada el ejemplo de la wikipedia visto en teoría.

A continuación se descarga una pequeña parte del fichero de la wikipedia en XML:


In [ ]:
download_file_upper_dir('http://neuromancer.inf.um.es:8080/wikipedia/','eswiki.xml.gz')

Se crea la tabla para albergar la wikipedia. Igual que la vista en teoría, pero aquí se usa wikipedia en vez de wiki para que no colisionen la versión completa con la reducida.

De nuevo en el shell de HBase:

create 'wikipedia' , 'text', 'revision'
disable 'wikipedia' # Para evitar su uso temporal
alter 'wikipedia' , { NAME => 'text', VERSIONS => org.apache.hadoop.hbase.HConstants::ALL_VERSIONS }
alter 'wikipedia' , { NAME => 'revision', VERSIONS => org.apache.hadoop.hbase.HConstants::ALL_VERSIONS }
alter 'wikipedia' , { NAME => 'text', COMPRESSION => 'GZ', BLOOMFILTER => 'ROW'}
enable 'wikipedia'

Este código, visto en teoría, recorre el árbol XML construyendo documentos y llamando a la función callback con cada uno. Los documentos son diccionarios con las claves encontradas dentro de los tags <page>...</page>.


In [ ]:
import xml.sax
import re

class WikiHandler(xml.sax.handler.ContentHandler):

    def __init__(self):
        self._charBuffer = ''
        self.document = {}

    def _getCharacterData(self):
        data = self._charBuffer
        self._charBuffer = ''
        return data

    def parse(self, f, callback):
        self.callback = callback
        xml.sax.parse(f, self)

    def characters(self, data):
        self._charBuffer = self._charBuffer + data

    def startElement(self, name, attrs):
        if name == 'page':
        # print 'Start of page'
            self.document = {}
        if re.match(r'title|timestamp|username|comment|text', name):
            self._charBuffer = ''

    def endElement(self, name):
        if re.match(r'title|timestamp|username|comment|text', name):
            self.document[name] = self._getCharacterData()
            # print(name, ': ', self.document[name][:20])
        if 'revision' == name:
            self.callback(self.document)

El codigo a continuación, cada vez que el código anterior llama a la función processdoc() se añade un documento a la base de datos.


In [ ]:
import time
import os
import gzip

class FillWikiTable():
    """Llena la tabla Wiki"""
    def __init__(self,connection):
        # Conectar a la base de datos a través de Thrift
        self.table = connection.table('wikipedia')

    def run(_s):
        def processdoc(d):
            print("Callback called with {0}".format(d['title']))
            tuple_time = time.strptime(d['timestamp'], "%Y-%m-%dT%H:%M:%SZ")
            timestamp = int(time.mktime(tuple_time))
            _s.table.put(d['title'],
                         {'text:': d.get('text',''),
                          'revision:author': d.get('username',''),
                          'revision:comment': d.get('comment','')},
                         timestamp=timestamp)

        with gzip.open(os.path.join(os.pardir,'eswiki.xml.gz'),'r') as f:
            start = time.time()
            WikiHandler().parse(f, processdoc)
            end = time.time()
            print ("End adding documents. Time: %.5f" % (end - start))

In [ ]:
with pool.connection() as connection:
    FillWikiTable(connection).run()

El código a continuación permite ver las diferentes versiones de una revisión. Como la versión reducida es muy pequeña no da lugar a que haya ninguna revisión, pero con este código se vería. Hace uso del shell de HBase:

get 'wikipedia', 'Commodore Amiga', {COLUMN => 'revision',VERSIONS=>10}

Enlazado de documentos en la wikipedia

Los artículos de la wikipedia llevan enlaces entre sí, incluyendo referencias del tipo [[artículo referenciado]]. Se pueden extraer estos enlaces y se puede construir un grafo de conexiones. Para cada artículo, se anotarán qué enlaces hay que salen de él y hacia qué otros artículos enlazan y también qué enlaces llegan a él. Esto se hará con dos familias de columnas, from y to.

En cada momento, se añadirá una columna from:artículo cuando un artículo nos apunte, y otras columnas to:articulo con los artículos que nosotros enlazamos.


In [ ]:
import sys

class BuildLinks():
    """Llena la tabla de Links"""
    def __init__(self,connection):
        # Create table
        try:
            connection.create_table(
                "wikilinks",
                {
                    'from': dict(bloom_filter_type='ROW',max_versions=1),
                    'to' : dict(bloom_filter_type='ROW',max_versions=1)
                })
        except:
            print ("Database wikilinks already exists.")
            pass

        self.table = connection.table('wikilinks')
        self.wikitable = connection.table('wikipedia')

    def run(self):
        print("run")
        linkpattern = r'\[\[([^\[\]\|\:\#][^\[\]\|:]*)(?:\|([^\[\]\|]+))?\]\]'
        # target, label

        with self.table.batch(batch_size=500) as b:
            for key, data in self.wikitable.scan():
                to_dict = {}
                doc = key.strip().decode('utf-8')
                print("\n{0}:".format(doc))
                for mo in re.finditer(linkpattern, data[b'text:'].decode('utf-8')):
                    (target, label) = mo.groups()

                    target = target.strip()

                    if target == '':
                        continue

                    label = '' if not label else label
                    label = label.strip()

                    to_dict['to:' + target] = label

                    sys.stdout.write(".")
                    
                    b.put(target, {'from:' + doc : label})

                if bool(to_dict):
                    b.put(doc, to_dict)

In [ ]:
with pool.connection() as connection:
    BuildLinks(connection).run()

En la siguiente sesión veremos técnicas más sofisticadas de filtrado, pero por ahora se puede jugar con estas construcciones. Se puede seleccionar qué columnas se quiere mostrar e incluso filtros. En el shell:

scan 'wikilinks', {COLUMNS=>'to', FILTER => "ColumnPrefixFilter('A')", LIMIT => 300}

El proceso de scan recorre toda la tabla mostrando sólo las filas seleccionadas. HBase ofrece ciertas optimizaciones para que el escaneo sea eficiente, que veremos en la siguiente sesión.

Una introducción a los filtros y parámetros disponibles se puede ver aquí. En el shell:

scan 'wikipedia', {COLUMNS=>['revision'] , STARTROW => 'A', ENDROW=>'B'}

EJERCICIO: Encontrar páginas que estén enlazadas y que ambas estén en la tabla wikipedia

(Ojo, no estarán todas porque es una versión reducida de la wikipedia)

EJERCICIO: Modificar la tabla posts para añadir una familia de columnas que guarde el histórico de ediciones guardado en PostHistory.csv. Usar como ejemplo la función csv_to_hbase