MongoDB

  • Schema Free
  • Document Based
  • Supports Indexing
  • Not Transactional
  • Does not support relations (no JOIN)
  • Supports Autosharding
  • Automatic Replication and Failover
  • Relies on System Memory Manager
  • Has an Aggregation Pipeline
  • Builtin support for MapReduce

On Python mongodb support is provided by PyMongo library, which can be installed using:

```` $ pip install pymongo


Installing MongoDB
------------------

Installing MongoDB is as simple as going to http://www.mongodb.org/downloads and downloading it.

Create a ``/data/db`` directory then start ``mongod`` inside the mongodb downloaded package:

$ curl -O 'https://fastdl.mongodb.org/osx/mongodb-osx-x86_64-3.0.4.tgz' $ tar zxvf mongodb-osx-x86_64-3.0.4.tgz $ cd mongodb-osx-x86_64-3.0.4 $ mkdir data $ ./bin/mongod --dbpath=./data ```

Using MongoDB

a MongoClient instance provides connection to MongoDB Server, each server can host multiple databases which can be retrieved with connection.database_name which can then contain multiple collections with different documents.


In [2]:
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')

In [4]:
db = client.phonebook
print db.collection_names()


[]

Once the database is retrieved, collections can be accessed as attributes of the database itself.

A MongoDB document is actually just a Python Dictionary, inserting a document is as simple as telling pymongo to insert the dictionary into the collection. Each document can have its own structure, can contain different data and you are not required to declare and structure of the collection. Not existing collections will be automatically created on the insertion of the first document


In [15]:
data = {'name': 'Alessandro', 'phone': '+39123456789'}
db.people.insert(data)


Out[15]:
ObjectId('55893be77ab71c669f4c149f')

In [6]:
print db.collection_names()


[u'people', u'system.indexes']

Each inserted document will receive an ObjectId which is a uniquue identifier of the document, the ObjectId is based on some data like the current timestamp, server identifier process id and other data that guarantees it to be unique across multiple servers.

Being designed to work in a distributed and multinode environment, MongoDB handles "write safety" by the number of servers that are expected to have saved the document before considering the insert command "completed".

This is handled by the w option, which indicates the number of servers that must have saved the document before the insert command returns. Setting it to 0 makes mongodb work in fire and forget mode, which is useful when inserting a lot of documents quickly. As most drivers will actually generate the ObjectId on client that performs the insertion you will receive an ObjectId even before the document has been written.


In [10]:
db.people.insert({'name': 'Puria', 'phone': '+39123456788', 'other_phone': '+3933332323'}, w=0)


Out[10]:
ObjectId('55893a1d7ab71c669f4c149e')

In [13]:
try:
    db.people.insert({'name': 'Puria', 'phone': '+39123456789'}, w=2)
except Exception as e:
    print e


cannot use 'w' > 1 when a host is not replicated

Fetching back inserted document can be done using find and find_one methods of collections. Both methods accept a query expression that filters the returned documents. Omitting it means retrieving all the documents (or in case of find_one the first document).


In [19]:
db.people.find_one({'name': 'Alessandro'})


Out[19]:
{u'_id': ObjectId('558939057ab71c669f4c149a'),
 u'name': u'Alessandro',
 u'phone': u'+39123456789'}

Filters in mongodb are described by Documents themselves, so in case of PyMongo they are dictionaries too. A filter can be specified in the form {'field': value}. By default filtering is performed by equality comparison, this can be changed by specifying a query operator in place of the value.

Query operators by convention start with a $ sign and can be specified as {'field': {'operator': value}}. Full list of query operators is available at http://docs.mongodb.org/manual/reference/operator/query/

For example if we want to find each person that has an object id greather than 53b30ff57ab71c051823b031 we can achieve that with:


In [38]:
from bson import ObjectId
db.people.find_one({'_id': {'$gt':  ObjectId('55893a1d7ab71c669f4c149e')}})


Out[38]:
{u'_id': ObjectId('55893be77ab71c669f4c149f'),
 u'name': u'Alessandro',
 u'phone': u'+39123456789'}

Updating Documents

Updating documents in MongoDB can be performed with the update method of the collection. Updating is actually one of the major sources of issues for new users as it doesn't change values in document like it does on SQL based databases, but instead it replaces the document with a new one.

Also note that the update operation doesn't perform update on each document identified by the query, by default only the first document is updated. To apply it to multiple documents it is required to explicitly specify the multi=true option

What you usually want to do is actually using the $set operator which changes the existing document instead of replacing it with a new one.


In [45]:
doc = db.people.find_one({'name': 'Alessandro'})
print '\nBefore Updated:', doc

db.people.update({'name': 'Alessandro'}, {'name': 'John Doe'})
doc = db.people.find_one({'name': 'John Doe'})
print '\nAfter Update:', doc

# Go back to previous state
db.people.update({'name': 'John Doe'}, {'$set': {'phone': '+39123456789'}})
print '\nAfter $set phone:', db.people.find_one({'name': 'John Doe'})
db.people.update({'name': 'John Doe'}, {'$set': {'name': 'Alessandro'}})
print '\nAfter $set name:', db.people.find_one({'name': 'Alessandro'})


Before Updated: {u'phone': u'+39123456789', u'_id': ObjectId('53b30ff57ab71c051823b031'), u'name': u'Alessandro'}

After Update: {u'_id': ObjectId('53b30ff57ab71c051823b031'), u'name': u'John Doe'}

After $set phone: {u'phone': u'+39123456789', u'_id': ObjectId('53b30ff57ab71c051823b031'), u'name': u'John Doe'}

After $set name: {u'phone': u'+39123456789', u'_id': ObjectId('53b30ff57ab71c051823b031'), u'name': u'Alessandro'}

SubDocuments

The real power of mongodb is released when you use subdocuments.

As each mongodb document is a JSON object (actually BSON, but that doesn't change much for the user), it can contain any data which is valid in JSON. Including other documents and arrays. This replaces "relations" between collections in multiple use cases and it's heavily more efficient as it returns all the data in a single query instead of having to perform multiple queries to retrieve related data.

As MongoDB fully supports subdocuments it is also possible to query on sub document fields and even query on arrays using the dot notation.

For example if you want to store a blog post in mongodb you might actually store everything, including author data and tags inside the blogpost itself:


In [30]:
db.blog.insert({'title': 'MongoDB is great!',
                'author': {'name': 'Alessandro',
                           'surname': 'Molina',
                           'avatar': 'http://www.gravatar.com/avatar/7a952cebb086d2114080b4b39ed83cad.png'},
                'tags': ['mongodb', 'web', 'scaling']})


Out[30]:
ObjectId('55893ee57ab71c669f4c14a0')

In [50]:
db.blog.find_one({'title': 'MongoDB is great!'})


Out[50]:
{u'_id': ObjectId('53b324d67ab71c051823b035'),
 u'author': {u'avatar': u'http://www.gravatar.com/avatar/7a952cebb086d2114080b4b39ed83cad.png',
  u'name': u'Alessandro',
  u'surname': u'Molina'},
 u'tags': [u'mongodb', u'web', u'scaling'],
 u'title': u'MongoDB is great!'}

In [34]:
db.blog.find_one({'tags': 'mongodb'})


Out[34]:
{u'_id': ObjectId('55893ee57ab71c669f4c14a0'),
 u'author': {u'avatar': u'http://www.gravatar.com/avatar/7a952cebb086d2114080b4b39ed83cad.png',
  u'name': u'Alessandro',
  u'surname': u'Molina'},
 u'tags': [u'mongodb', u'web', u'scaling'],
 u'title': u'MongoDB is great!'}

In [55]:
db.blog.find_one({'author.name': 'Alessandro'})


Out[55]:
{u'_id': ObjectId('53b324d67ab71c051823b035'),
 u'author': {u'avatar': u'http://www.gravatar.com/avatar/7a952cebb086d2114080b4b39ed83cad.png',
  u'name': u'Alessandro',
  u'surname': u'Molina'},
 u'tags': [u'mongodb', u'web', u'scaling'],
 u'title': u'MongoDB is great!'}

In [5]:
TAGS = ['mongodb', 'web', 'scaling', 'cooking']

import random
for postnum in range(1, 5):
    db.blog.insert({'title': 'Post %s' % postnum,
                    'author': {'name': 'Alessandro',
                               'surname': 'Molina',
                               'avatar': 'http://www.gravatar.com/avatar/7a952cebb086d2114080b4b39ed83cad.png'},
                    'tags': random.sample(TAGS, 2)})

In [40]:
for post in db.blog.find({'tags': {'$in': ['scaling', 'cooking']}}):
    print post['title'], '->', ', '.join(post['tags'])


MongoDB is great! -> mongodb, web, scaling
Post 1 -> mongodb, cooking
Post 2 -> cooking, web
Post 3 -> web, cooking
Post 4 -> cooking, mongodb

Indexing

Indexing is actually the most important part of MongoDB.

MongoDB has great support for indexing, and it supports single key, multi key, compound and hashed indexes. Each index type has its specific use case and can be used both for querying and sorting.

  • Single Key -> Those are plain indexes on a field
  • Multi Key -> Those are indexes created on an array field
  • Compound -> Those are indexes that cover more than one field.
  • Hashed -> Those are indexes optimized for equality comparison, they actually store the hash of the indexed value and are usually used for sharding.

In case of compound indexes they can also be used when only a part of the query filter is present into the index, there is also a special case of indexes called covering indexes which happen when the fields you are asking for are all available into the index. In that case MongoDB won't even access the collection and will directly serve you the data from the index. An index cannot be both a multi key index and a covering index.

Indexes are also ordered, so they can be created ASCENDING or DESCENDING.

Creating indexes can be done using the ensure_index method


In [6]:
db.blog.ensure_index([('tags', 1)])


Out[6]:
u'tags_1'

Checking which index MongoDB is using to perform a query can be done using the explain method, forcing an index into a query can be done using the hint method.

As MongoDB uses a statistical optimizer, using hint in queries can actually provide a performance boost as it avoids the "best option" lookup cost of the optimizer.


In [14]:
db.blog.find({'tags': 'mongodb'}).explain()['queryPlanner']['winningPlan']


Out[14]:
{u'inputStage': {u'direction': u'forward',
  u'indexBounds': {u'tags': [u'["mongodb", "mongodb"]']},
  u'indexName': u'tags_1',
  u'isMultiKey': True,
  u'keyPattern': {u'tags': 1},
  u'stage': u'IXSCAN'},
 u'stage': u'FETCH'}

In [15]:
db.blog.find({'tags': 'mongodb'}).hint([('_id', 1)]).explain()['queryPlanner']['winningPlan']


Out[15]:
{u'filter': {u'tags': {u'$eq': u'mongodb'}},
 u'inputStage': {u'direction': u'forward',
  u'indexBounds': {u'_id': [u'[MinKey, MaxKey]']},
  u'indexName': u'_id_',
  u'isMultiKey': False,
  u'keyPattern': {u'_id': 1},
  u'stage': u'IXSCAN'},
 u'stage': u'FETCH'}

In [16]:
db.blog.find({'title': 'Post 1'}).explain()['queryPlanner']['winningPlan']


Out[16]:
{u'direction': u'forward',
 u'filter': {u'title': {u'$eq': u'Post 1'}},
 u'stage': u'COLLSCAN'}

In [18]:
db.blog.ensure_index([('author.name', 1), ('title', 1)])
db.blog.find({'author.name': 'Alessandro'}, {'title': True, '_id': False}).explain()['queryPlanner']['winningPlan']


Out[18]:
{u'inputStage': {u'direction': u'forward',
  u'indexBounds': {u'author.name': [u'["Alessandro", "Alessandro"]'],
   u'title': [u'[MinKey, MaxKey]']},
  u'indexName': u'author.name_1_title_1',
  u'isMultiKey': False,
  u'keyPattern': {u'author.name': 1, u'title': 1},
  u'stage': u'IXSCAN'},
 u'stage': u'PROJECTION',
 u'transformBy': {u'_id': False, u'title': True}}

Aggregation Pipeline

The aggreation pipeline provided by the aggreation framework is a powerful feature in MongoDB that permits to perform complex data analysis by passing the documents through a pipeline of operations.

MongoDB was created with the cover philosophy that you are going to store your documents depending on the way you are going to read them. So to properly design your schema you need to know how you are going to use the documents. While this approach provides great performance benefits and is more concrete in case of web application, it might not always be feasible.

In case you need to perform some kind of analysis your documents are not optimized for, you can rely on the aggreation framework to create a pipeline that transforms them in a way more practical for the kind of analysis you need.

How it works

The aggregation pipeline is a list of operations that gets executed one after the other on the documents of the collections. The first operation will be performed on all the documents, while successive operations are performed on the result of the previous steps.

If steps are able to take advantage of indexes they will, that is the case for a match or sort operator, if it appears at the begin of the pipeline. All operators start with a $ sign

Stage Operators

  • project Reshapes each document in the stream, such as by adding new fields or removing existing fields. For each input document, outputs one document.
  • match Filters the document stream to allow only matching documents to pass unmodified into the next pipeline stage. match uses standard MongoDB queries. For each input document, outputs either one document (a match) or zero documents (no match).
  • limit Passes the first n documents unmodified to the pipeline where n is the specified limit. For each input document, outputs either one document (for the first n documents) or zero documents (after the first n documents).
  • skip Skips the first n documents where n is the specified skip number and passes the remaining documents unmodified to the pipeline. For each input document, outputs either zero documents (for the first n documents) or one document (if after the first n documents).
  • unwind Deconstructs an array field from the input documents to output a document for each element. Each output document replaces the array with an element value. For each input document, outputs n documents where n is the number of array elements and can be zero for an empty array.
  • group Groups input documents by a specified identifier expression and applies the accumulator expression(s), if specified, to each group. Consumes all input documents and outputs one document per each distinct group. The output documents only contain the identifier field and, if specified, accumulated fields.
  • sort Reorders the document stream by a specified sort key. Only the order changes; the documents remain unmodified. For each input document, outputs one document.
  • geoNear Returns an ordered stream of documents based on the proximity to a geospatial point. Incorporates the functionality of match, sort, and limit for geospatial data. The output documents include an additional distance field and can include a location identifier field.
  • out Writes the resulting documents of the aggregation pipeline to a collection. To use the $out stage, it must be the last stage in the pipeline.

Expression Operators

Each stage operator can work with one or more expression operator which allow to perform actions during that stage, for a list of expression operators see http://docs.mongodb.org/manual/reference/operator/aggregation/#expression-operators

Pipeline Examples

Examples are based on twitter database from the same S3 bucket used in MrJob examples imported in mongodb using:

$ curl -O http://panisson-bigdive.s3.amazonaws.com/twitter/2011-02-11/2011-02-11.json.aa.gz
$ gunzip 2011-02-11.json.aa.gz
$ mongoimport --db twitter --collection tweets /Users/adrianopagano/Desktop/Big_Dive/BigDive5/Data/2011-02-11.json.aa
2015-06-21T17:18:06.908+0200    connected to: localhost
2015-06-21T17:18:09.896+0200    [#########...............] twitter.tweets       19.6 MB/50.0 MB (39.3%)
2015-06-21T17:18:12.900+0200    [###################.....] twitter.tweets       41.1 MB/50.0 MB (82.2%)
2015-06-21T17:18:13.720+0200    imported 20000 documents

In [3]:
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client.twitter

# How many professors wrote a tweet?
print len(list(db.tweets.aggregate([
    {'$match': {'user.description': {'$regex': 'Professor'}}}
])))

# Count them using only the pipeline
print db.tweets.aggregate([
    {'$match': {'user.description': {'$regex': 'Professor'}}},
    {'$group': {'_id': 'count', 'count': {'$sum': 1}}}
]).next()['count']


# Hashtags frequency
print list(db.tweets.aggregate([
    {'$project': {'tags': '$entities.hashtags.text', '_id': 0}},
    {'$unwind': '$tags'},
    {'$group': {'_id': '$tags', 'count': {'$sum': 1}}},
    {'$match': {'count': {'$gt': 20}}}
]))


11
11
[{u'count': 24, u'_id': u'golmadrid'}, {u'count': 22, u'_id': u'1day'}, {u'count': 24, u'_id': u'GolMadrid'}, {u'count': 21, u'_id': u'jan25'}, {u'count': 35, u'_id': u'np'}, {u'count': 53, u'_id': u'ipod'}, {u'count': 21, u'_id': u'sougofollow'}, {u'count': 30, u'_id': u'AprendiComKpop'}, {u'count': 35, u'_id': u'Egypt'}, {u'count': 25, u'_id': u'KevinJForKCA2011'}, {u'count': 34, u'_id': u'nowplaying'}, {u'count': 22, u'_id': u'RT'}]

MapReduce

MongoDB is powered by the V8 javascript engine, this means that each mongod node is able to run javascript code. With an high enough number of mongod nodes, you actually end up with a powerful execution environment for distributed code that also copes with the major problem of data locality.

For this reason MongoDB exposes a mapreduce function which can be leveraged in shareded environments to run map reduce jobs. Note that the Aggregation Pipeline is usually faster than the mapReduce feature, and it scales with the number of nodes as mapReduce, so you should rely on MapReduce only when the algorithm cannot be efficiently expressed with the Aggregation Pipeline.


In [17]:
freqs = db.tweets.map_reduce(
    map='''function() {
        var tags = this.entities.hashtags;
        for(var i=0; i<tags.length; i++)
            emit(tags[i].text, 1);
    }''',
    reduce='''function(key, values) {
        return Array.sum(values); 
    }''',
    out='tagsfrequency'
)

print(list(
    db.tagsfrequency.find({'value': {'$gt': 10}}).sort([('value', -1)])
))


[{u'_id': u'ipod', u'value': 53.0}, {u'_id': u'Egypt', u'value': 35.0}, {u'_id': u'np', u'value': 35.0}, {u'_id': u'nowplaying', u'value': 34.0}, {u'_id': u'AprendiComKpop', u'value': 30.0}, {u'_id': u'KevinJForKCA2011', u'value': 25.0}, {u'_id': u'GolMadrid', u'value': 24.0}, {u'_id': u'golmadrid', u'value': 24.0}, {u'_id': u'1day', u'value': 22.0}, {u'_id': u'RT', u'value': 22.0}, {u'_id': u'jan25', u'value': 21.0}, {u'_id': u'sougofollow', u'value': 21.0}, {u'_id': u'LiaRainhadoBloconoBBB', u'value': 16.0}, {u'_id': u'MuzikRadio', u'value': 16.0}, {u'_id': u'followmejp', u'value': 16.0}, {u'_id': u'Jan25', u'value': 15.0}, {u'_id': u'ahoetshirtwouldsay', u'value': 15.0}, {u'_id': u'fb', u'value': 15.0}, {u'_id': u'kevinjforkca2011', u'value': 15.0}, {u'_id': u'AHoeTshirtWouldSay', u'value': 14.0}, {u'_id': u'FF', u'value': 14.0}, {u'_id': u'Mubarak', u'value': 14.0}, {u'_id': u'TeamFollowBack', u'value': 14.0}, {u'_id': u'DamnItsTrue', u'value': 13.0}, {u'_id': u'NP', u'value': 13.0}, {u'_id': u'NeverSayNever3D', u'value': 13.0}, {u'_id': u'cigarrasclipe', u'value': 13.0}, {u'_id': u'jobs', u'value': 13.0}, {u'_id': u'egypt', u'value': 12.0}, {u'_id': u'followme', u'value': 12.0}, {u'_id': u'TeamGetzItOut', u'value': 11.0}, {u'_id': u'Touro', u'value': 11.0}, {u'_id': u'Twitition', u'value': 11.0}]

In [18]:
print freqs


Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), u'twitter'), u'tagsfrequency')

In [19]:
db.tweets.find_one()


Out[19]:
{u'_id': ObjectId('57625e5c347976e4a5e67a58'),
 u'contributors': None,
 u'coordinates': None,
 u'created_at': u'Fri Feb 11 00:00:00 +0000 2011',
 u'entities': {u'hashtags': [], u'urls': [], u'user_mentions': []},
 u'favorited': False,
 u'geo': None,
 u'id': 35850503103447040L,
 u'id_str': u'35850503103447040',
 u'in_reply_to_screen_name': None,
 u'in_reply_to_status_id': None,
 u'in_reply_to_status_id_str': None,
 u'in_reply_to_user_id': None,
 u'in_reply_to_user_id_str': None,
 u'place': None,
 u'retweet_count': 0,
 u'retweeted': False,
 u'source': u'<a href="http://twitter.com/" rel="nofollow">Twitter for iPhone</a>',
 u'text': u'The face of international Law will become very interesting with the advent of the cyber world',
 u'truncated': False,
 u'user': {u'contributors_enabled': False,
  u'created_at': u'Thu Apr 16 01:34:12 +0000 2009',
  u'description': u'Catholic, Doctoral Student.\r\nBleed234 (Naija)|Breathe225 (CIV)|Believe316 (John)\r\n#teamSalvation #teamNano #teamAfrica #teamSoccer #teamTHINKers  #TEAMTRUTH',
  u'favourites_count': 138,
  u'follow_request_sent': None,
  u'followers_count': 594,
  u'following': None,
  u'friends_count': 505,
  u'geo_enabled': False,
  u'id': 31591915,
  u'id_str': u'31591915',
  u'is_translator': False,
  u'lang': u'en',
  u'listed_count': 40,
  u'location': u'Recalculating.....',
  u'name': u'SirDarlington',
  u'notifications': None,
  u'profile_background_color': u'131516',
  u'profile_background_image_url': u'http://a0.twimg.com/profile_background_images/74754255/eucharist.jpg',
  u'profile_background_tile': True,
  u'profile_image_url': u'http://a2.twimg.com/profile_images/1232424126/DSC_0250_copy_normal.jpg',
  u'profile_link_color': u'009999',
  u'profile_sidebar_border_color': u'eeeeee',
  u'profile_sidebar_fill_color': u'efefef',
  u'profile_text_color': u'333333',
  u'profile_use_background_image': True,
  u'protected': False,
  u'screen_name': u'SirDarlington',
  u'show_all_inline_media': False,
  u'statuses_count': 15160,
  u'time_zone': u'Quito',
  u'url': u'http://darlingtonium.wordpress.com',
  u'utc_offset': -18000,
  u'verified': False}}

In [41]:
freqs = db.tweets.map_reduce(
    map='''function() {
        var tags = this.user.screen_name;
        emit(tags, 1);
    }''',
    reduce='''function(key, values) {
        return Array.sum(values); 
    }''',
    out='namefrequency'
)

print(list(
    db.namefrequency.find().sort([('value', -1)]).limit(10)
))


[{u'_id': u'classieSchutz91', u'value': 15.0}, {u'_id': u'taishaBruney982', u'value': 14.0}, {u'_id': u'danetteArras855', u'value': 13.0}, {u'_id': u'nichelleLicea51', u'value': 13.0}, {u'_id': u'IMonsterThisWay', u'value': 12.0}, {u'_id': u'coryInloes285', u'value': 10.0}, {u'_id': u'koriCanterberry', u'value': 10.0}, {u'_id': u'janyceGriffin14', u'value': 7.0}, {u'_id': u'kandiceLunz825', u'value': 7.0}, {u'_id': u'BI3B3RnB3ADL3S', u'value': 6.0}]

Exporting from MongoDB

There are cases you might want to export the results of a mongodb query to make possible to process them into another system, this might be the case for an EMR job which has to perform operations on data stored on MongoDB.

The most simple solution to export those data in a format recognized by EMR and MrJob is using the mongoexport tool provided with mongodb itself. The tool is able to export data in a format recognized by MrJob JSONValueProcotol so you can upload it to S3 and directly process it from EMR.

For example, exporting all the data for the web tag, can be easily done using:

$ ./mongoexport -d phonebook -c blog -o /tmp/data.json -q '{"tags": "web"}'

This will write the data to /tmp/data.json in a format recognized by JSONValueProtocol, full list of options can be seen using --help, in the previous example the following options were used:

  • -d -> to specify the database
  • -c -> to specify the collection in the database
  • -o -> to write output to /tmp/data.json
  • -q -> to filter output by the provided query

Sharding

Sharding, or horizontal scaling, divides the data set and distributes the data over multiple servers, or shards. Each shard is an independent database, and collectively, the shards make up a single logical database.

Chunk The whole set of data is divided in Chunks, chunk are then distributed as equally as possible through all the nodes

Shard Key The shard key is the Document property on which chunks are decided, the range of shard key possible values is divided in chunks and each chunk is assigned to a node. Document which near values for the shard key will end up being in the same chunk and so on the same node.

Shard Each MongoDB node or ReplicaSet that contains part of the sharded data.

Router The routers is the interface to the cluster, each query and operation will be performed against the router. The router is then in charge of forwarding the operation to one or multiple shards and gather the results.

Config Server The config servers keep track of chunks distribution, they know which shard contains which chunk and which values are kept inside each chunk. Whenever the router has to perform an operation or split chunks that became too big it will read and write chunks distribution from the config servers.

Setting Up a Sharded Cluster

To properly setup a sharded environment at least 1 mongos, 2 shards and 1 config server are required. That's the minimum requirement for a test environment and is not suitable for production usage.

First we need to create the directories for each node:

$ mkdir /tmp/mongocluster
$ mkdir /tmp/mongocluster/n0
$ mkdir /tmp/mongocluster/n1
$ mkdir /tmp/mongocluster/n2
$ mkdir /tmp/mongocluster/c0

Then we need to start the shards:

$ mongod --port 27016 --dbpath /tmp/mongocluster/n0
$ mongod --port 27015 --dbpath /tmp/mongocluster/n1
$ mongod --port 27014 --dbpath /tmp/mongocluster/n2

Then we need to start at least a config server:

$ mongod --configsvr --dbpath /tmp/mongocluster/c0 --port 27019

Now that all the required nodes are up, we can finaly start the mongos router which is in charge of actually providing the sharding functionality:

$ mongos --configdb 127.0.0.1:27019

Now all the required nodes are up and running, but we still didn't configure any sharded environment. The first step required to setup a sharding environment is to actually add the nodes to the cluster. To do so we need to connect to the mongos and issue the sh.addShard command:

$ mongo 
MongoDB shell version: 3.0.4
connecting to: test
mongos> sh.addShard('127.0.0.1:27016')
{ "shardAdded" : "shard0000", "ok" : 1 }
mongos> sh.addShard('127.0.0.1:27015')
{ "shardAdded" : "shard0001", "ok" : 1 }
mongos> sh.addShard('127.0.0.1:27014')
{ "shardAdded" : "shard0002", "ok" : 1 }
``` 

Now that our shards have been added to the cluster we can turn on sharding for **databases** and **collections**.
Only sharded collections will actually be sharded across the nodes.
We are going to shard our collection of tweets, so the first step is to enable sharding for the ``twitter`` database:

mongos> sh.enableSharding('twitter') { "ok" : 1 }


Now we need to provide the actual sharding key for our ``tweets`` collection. Until a sharding key is provided, no sharding happens. To ensure that tweets are properly distributed across nodes we are going to shard by the screen name of the author:

mongos> sh.shardCollection("twitter.tweets", {'user.screen_name': 1}) { "collectionsharded" : "twitter.tweets", "ok" : 1 }


Now we can finally import our data and see that it gets distributed across the nodes:

$ mongoimport --db twitter --collection tweets 2011-02-11.json.aa


To check that our data has properly distributed across nodes:

mongos> use twitter switched to db twitter mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "minCompatibleVersion" : 5, "currentVersion" : 6, "clusterId" : ObjectId("558fc718712e65efc2a378d9") } shards: { "_id" : "shard0000", "host" : "localhost:27016" } { "_id" : "shard0001", "host" : "localhost:27015" } { "_id" : "shard0002", "host" : "localhost:27014" } balancer: Currently enabled: yes Currently running: no Failed balancer rounds in last 5 attempts: 0 Migration Results for the last 24 hours: 4 : Success databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : false, "primary" : "shard0002" } { "_id" : "twitter", "partitioned" : true, "primary" : "shard0002" } twitter.tweets shard key: { "user.screen_name" : 1 } chunks: shard0000 2 shard0001 2 shard0002 2 { "user.screen_name" : { "$minKey" : 1 } } -->> { "user.screen_name" : "111111121111111" } on : shard0000 Timestamp(2, 0) { "user.screen_name" : "111111121111111" } -->> { "user.screen_name" : "YohannaCS" } on : shard0001 Timestamp(3, 0) { "user.screen_name" : "YohannaCS" } -->> { "user.screen_name" : "graciadelcielo" } on : shard0000 Timestamp(4, 0) { "user.screen_name" : "graciadelcielo" } -->> { "user.screen_name" : "nosso_surita" } on : shard0001 Timestamp(5, 0) { "user.screen_name" : "nosso_surita" } -->> { "user.screen_name" : "yuuki_gei" } on : shard0002 Timestamp(5, 1) { "user.screen_name" : "yuuki_gei" } -->> { "user.screen_name" : { "$maxKey" : 1 } } on : shard0002 Timestamp(1, 6) ```

NOTE: Splitting by a date or sequential values is usually not a good idea, as you end up enforcing all the workload on the primary node that contains the most recent data.


In [19]:
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client.twitter

db.tweets.map_reduce(
    map='''function() {
        var tags = this.entities.hashtags;
        for(var i=0; i<tags.length; i++)
            emit(tags[i].text, 1);
    }''',
    reduce='''function(key, values) {
        return Array.sum(values); 
    }''',
    out='tagsfrequency'
)

print(list(
    db.tagsfrequency.find({'value': {'$gt': 10}})
))


[{u'_id': u'1day', u'value': 13.0}, {u'_id': u'AprendiComKpop', u'value': 24.0}, {u'_id': u'DamnItsTrue', u'value': 11.0}, {u'_id': u'Egypt', u'value': 29.0}, {u'_id': u'FF', u'value': 13.0}, {u'_id': u'GolMadrid', u'value': 12.0}, {u'_id': u'KevinJForKCA2011', u'value': 18.0}, {u'_id': u'LiaRainhadoBloconoBBB', u'value': 15.0}, {u'_id': u'MuzikRadio', u'value': 11.0}, {u'_id': u'NeverSayNever3D', u'value': 11.0}, {u'_id': u'RT', u'value': 19.0}, {u'_id': u'TeamFollowBack', u'value': 12.0}, {u'_id': u'TeamGetzItOut', u'value': 11.0}, {u'_id': u'ahoetshirtwouldsay', u'value': 14.0}, {u'_id': u'cigarrasclipe', u'value': 13.0}, {u'_id': u'fb', u'value': 11.0}, {u'_id': u'followme', u'value': 11.0}, {u'_id': u'followmejp', u'value': 15.0}, {u'_id': u'golmadrid', u'value': 24.0}, {u'_id': u'ipod', u'value': 41.0}, {u'_id': u'jan25', u'value': 17.0}, {u'_id': u'jobs', u'value': 13.0}, {u'_id': u'kevinjforkca2011', u'value': 12.0}, {u'_id': u'nowplaying', u'value': 28.0}, {u'_id': u'np', u'value': 28.0}, {u'_id': u'sougofollow', u'value': 19.0}]

You will notice that the map_reduce command has now been properly split across the nodes of our cluster. Our shards should report in their logs something like:

2015-06-28T12:31:58.231+0200 I COMMAND  [conn4] command twitter.$cmd command: mapReduce { mapreduce: "tweets", map: "function() {
        var tags = this.entities.hashtags;
        for(var i=0; i<tags.length; i++)
            emit(tags[i].text, 1);
    }", reduce: "function(key, values) {
        return Array.sum(values); 
    }", out: "tmp.mrs.tweets_1435487518_0", shardedFirstPass: true } ntoreturn:1 keyUpdates:0 writeConflicts:0 numYields:0 reslen:151 locks:{ Global: { acquireCount: { r: 2225, w: 1068, W: 3 } }, MMAPV1Journal: { acquireCount: { r: 575, w: 2130 } }, Database: { acquireCount: { r: 535, w: 1060, R: 42, W: 11 } }, Collection: { acquireCount: { R: 535, W: 1063 } }, Metadata: { acquireCount: { W: 8 } } } 102ms

Performances

Journal Performances

  • Write performance is reduced by 5-30%
  • For apps that are write-heavy (1000+ writes per server) there can be slowdown due to mix of journal and data flushes.

To avoid Journal Overhead save the journal on a separate DISK from data, it will lower the journal overhead down to 3%.

Fragmentation

Files can get fragmented over time if remove() and update() are issued.

  • It gets worse if documents have varied sizes
  • Fragmentation wastes disk space and RAM
  • Also makes writes scattered and slower (have to lookup for an empty slot in extent)
  • Fragmentation can be checked by comparing size to storageSize in the collection’s stats
  • nmoved=1 in logs means document has been resized and moved to another extent

PowerOf2Allocation is default on 2.6, is more efficient in case of updates/remove as each record has a size in bytes that is a power of 2 (e.g. 32, 64, 128, 256, 512...) so when updating documents they probably have not need to be moved (if document was 200bytes it will have up to 56 more bytes before needing to be reallocated) and when deleted it will leave a slot that can be reused for another document as it will match for sure the same size being rounded to powers of 2.

https://github.com/10gen-labs/storage-viz helps debugging storage, RAM and fragmentation.

Replication Lag

  • Secondaries underspec’d vs primaries
  • Access patterns between primary and secondaries
  • Insufficient bandwidth (Estimate required bandwidth to sync: op/sec * docsize + 40%)
  • Foreground index builds on secondaries

https://github.com/rueckstiess/mtools helps debugging operations logs and slow replication

PySpark

=======

An RDD can be considered as huge list without any keys

  • Driver --> The node that asks for execution
  • Master --> The node that coordinates all nodes
  • Worker -> The node that actually performs computation

Transformation

RDD = [1,2,3]

  • map when using map the final RDD is a list of list = [[1], [1,1], [1,1,1]]
  • flatmap when using flatmap the final RDD is a flat list = [1,1,1,1,1,1]
  • filter = [2]
  • union = RDD1 + RDD2
  • intersect = RDD1 & RDD2
  • reduceByKey = [(key, value)]

Actions

RDD = [1,2,3]

  • reduce() = 6
  • collect() = [1,2,3]
  • take(1) = [1]

How to download Spark

To use Spark you need Java

Download http://blackhole.test.axantweb.com/pyspark-1.6.1.tgz

To check if it worked: expand the directory, cd into it and run ./pyspark.sh.

If error Exception in thread "main" use this solution:

sudo rm /usr/bin/java

sudo ln -s /Library/Internet\ Plug-Ins/JavaAppletPlugin.plugin/Contents/Home/bin/java /usr/bin

To use Spark in Jupyter:

  1. Activate virtual env where Jupyter Notebook is installed
  2. IPYTHON_OPTS = "notebook" .pyspark.sh
  3. if notebook in another dir: `IPYTHON_OPTS = "notebook --notebook-dir=~/path/to/notebooks" .pyspark.sh

In [1]:
sc


Out[1]:
<pyspark.context.SparkContext at 0x10f25f390>

In [2]:
data = sc.parallelize([1,2,3,4,5,6])

In [3]:
data.first()


Out[3]:
1

In [4]:
def multiply(v):
    return v*2

In [7]:
multdata = data.map(multiply)
print multdata


PythonRDD[2] at RDD at PythonRDD.scala:43

In [13]:
multdata.collect()


Out[13]:
[2, 4, 6, 8, 10, 12]

In [9]:
sc.defaultParallelism


Out[9]:
4

In [12]:
values = range(20)
print values


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

In [19]:
rddvalues = sc.parallelize(values)

def sumall(i):
    yield list(i) # returns the list on each core (4 cores available)
    
rddvalues.mapPartitions(sumall).collect()


Out[19]:
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, 16, 17, 18, 19]]

In [20]:
def summap(v):
    return v+1

rddvalues.map(summap).collect()


Out[20]:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

In [21]:
def sumtwo(a, b):
    return a+b

rddvalues.reduce(sumtwo)


Out[21]:
190

When working on one value use map.

When working > 1 value use reduce.


In [23]:
low = sc.parallelize(range(1,10))
high = sc.parallelize(range(5,15))

In [24]:
low.union(high).collect()


Out[24]:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

In [25]:
low.intersection(high).collect()


Out[25]:
[8, 9, 5, 6, 7]

In [27]:
low.union(high).distinct().collect()


Out[27]:
[8, 1, 9, 2, 10, 11, 3, 12, 4, 5, 13, 14, 6, 7]

In [56]:
text = sc.textFile('./Jungle_Book.txt')

In [34]:
print text


./Jungle_Book.txt MapPartitionsRDD[37] at textFile at NativeMethodAccessorImpl.java:-2

In [57]:
text.count()


Out[57]:
5803

In [45]:
def splitlines(line):
    return line.split()

words = text.flatMap(splitlines)

In [46]:
words.count()


Out[46]:
53856

In [47]:
words.take(15)


Out[47]:
[u'The',
 u'Project',
 u'Gutenberg',
 u'EBook',
 u'of',
 u'The',
 u'Jungle',
 u'Book,',
 u'by',
 u'Rudyard',
 u'Kipling',
 u'This',
 u'eBook',
 u'is',
 u'for']

In [107]:
def get_freq(word):
    return word, 1

def get_count(a, b): # this will be used with reduceByKey
    return a+b

def switch_tuple(t): # this is needed to sort by Key (flips the passed tuple)
    return t[1], t[0]

print text.flatMap(splitlines).map(get_freq).reduceByKey(get_count).map(switch_tuple).sortByKey(0, 1).collect()[:20]


[(3337, u'the'), (2166, u'and'), (1287, u'of'), (1214, u'to'), (1092, u'a'), (830, u'he'), (700, u'in'), (633, u'his'), (569, u'that'), (504, u'was'), (470, u'I'), (432, u'is'), (423, u'for'), (412, u'with'), (398, u'as'), (363, u'said'), (325, u'on'), (287, u'not'), (284, u'all'), (281, u'had')]

In [ ]: