On Python mongodb support is provided by PyMongo
library, which can be installed using:
```` $ pip install pymongo
Installing MongoDB
------------------
Installing MongoDB is as simple as going to http://www.mongodb.org/downloads and downloading it.
Create a ``/data/db`` directory then start ``mongod`` inside the mongodb downloaded package:
$ curl -O 'https://fastdl.mongodb.org/osx/mongodb-osx-x86_64-3.0.4.tgz' $ tar zxvf mongodb-osx-x86_64-3.0.4.tgz $ cd mongodb-osx-x86_64-3.0.4 $ mkdir data $ ./bin/mongod --dbpath=./data ```
a MongoClient
instance provides connection to MongoDB Server, each server can host multiple databases which can be retrieved with connection.database_name
which can then contain multiple collections
with different documents.
In [2]:
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
In [4]:
db = client.phonebook
print db.collection_names()
Once the database is retrieved, collections can be accessed as attributes of the database itself.
A MongoDB document is actually just a Python Dictionary, inserting a document is as simple as telling pymongo to insert the dictionary into the collection. Each document can have its own structure, can contain different data and you are not required to declare and structure of the collection. Not existing collections will be automatically created on the insertion of the first document
In [15]:
data = {'name': 'Alessandro', 'phone': '+39123456789'}
db.people.insert(data)
Out[15]:
In [6]:
print db.collection_names()
Each inserted document will receive an ObjectId
which is a uniquue identifier of the document, the ObjectId is based on some data like the current timestamp
, server identifier
process id
and other data that guarantees it to be unique across multiple servers.
Being designed to work in a distributed and multinode environment, MongoDB handles "write safety" by the number of servers that are expected to have saved the document before considering the insert command "completed".
This is handled by the w
option, which indicates the number of servers that must have saved the document before the insert command returns. Setting it to 0
makes mongodb work in fire and forget mode, which is useful when inserting a lot of documents quickly. As most drivers will actually generate the ObjectId on client that performs the insertion you will receive an ObjectId even before the document has been written.
In [10]:
db.people.insert({'name': 'Puria', 'phone': '+39123456788', 'other_phone': '+3933332323'}, w=0)
Out[10]:
In [13]:
try:
db.people.insert({'name': 'Puria', 'phone': '+39123456789'}, w=2)
except Exception as e:
print e
Fetching back inserted document can be done using find
and find_one
methods of collections. Both methods accept a query expression that filters the returned documents. Omitting it means retrieving all the documents (or in case of find_one the first document).
In [19]:
db.people.find_one({'name': 'Alessandro'})
Out[19]:
Filters in mongodb are described by Documents themselves, so in case of PyMongo they are dictionaries too.
A filter can be specified in the form {'field': value}
.
By default filtering is performed by equality comparison, this can be changed by specifying a query operator in place of the value.
Query operators by convention start with a $
sign and can be specified as {'field': {'operator': value}}
.
Full list of query operators is available at http://docs.mongodb.org/manual/reference/operator/query/
For example if we want to find each person that has an object id greather than 53b30ff57ab71c051823b031
we can achieve that with:
In [38]:
from bson import ObjectId
db.people.find_one({'_id': {'$gt': ObjectId('55893a1d7ab71c669f4c149e')}})
Out[38]:
Updating documents in MongoDB can be performed with the update
method of the collection. Updating is actually one of the major sources of issues for new users as it doesn't change values in document like it does on SQL based databases, but instead it replaces the document with a new one.
Also note that the update operation doesn't perform update on each document identified by the query, by default only the first document is updated. To apply it to multiple documents it is required to explicitly specify the multi=true
option
What you usually want to do is actually using the $set
operator which changes the existing document instead of replacing it with a new one.
In [45]:
doc = db.people.find_one({'name': 'Alessandro'})
print '\nBefore Updated:', doc
db.people.update({'name': 'Alessandro'}, {'name': 'John Doe'})
doc = db.people.find_one({'name': 'John Doe'})
print '\nAfter Update:', doc
# Go back to previous state
db.people.update({'name': 'John Doe'}, {'$set': {'phone': '+39123456789'}})
print '\nAfter $set phone:', db.people.find_one({'name': 'John Doe'})
db.people.update({'name': 'John Doe'}, {'$set': {'name': 'Alessandro'}})
print '\nAfter $set name:', db.people.find_one({'name': 'Alessandro'})
The real power of mongodb is released when you use subdocuments.
As each mongodb document is a JSON object (actually BSON, but that doesn't change much for the user), it can contain any data which is valid in JSON. Including other documents and arrays. This replaces "relations" between collections in multiple use cases and it's heavily more efficient as it returns all the data in a single query instead of having to perform multiple queries to retrieve related data.
As MongoDB fully supports subdocuments it is also possible to query on sub document fields and even query on arrays using the dot notation
.
For example if you want to store a blog post in mongodb you might actually store everything, including author data and tags inside the blogpost itself:
In [30]:
db.blog.insert({'title': 'MongoDB is great!',
'author': {'name': 'Alessandro',
'surname': 'Molina',
'avatar': 'http://www.gravatar.com/avatar/7a952cebb086d2114080b4b39ed83cad.png'},
'tags': ['mongodb', 'web', 'scaling']})
Out[30]:
In [50]:
db.blog.find_one({'title': 'MongoDB is great!'})
Out[50]:
In [34]:
db.blog.find_one({'tags': 'mongodb'})
Out[34]:
In [55]:
db.blog.find_one({'author.name': 'Alessandro'})
Out[55]:
In [5]:
TAGS = ['mongodb', 'web', 'scaling', 'cooking']
import random
for postnum in range(1, 5):
db.blog.insert({'title': 'Post %s' % postnum,
'author': {'name': 'Alessandro',
'surname': 'Molina',
'avatar': 'http://www.gravatar.com/avatar/7a952cebb086d2114080b4b39ed83cad.png'},
'tags': random.sample(TAGS, 2)})
In [40]:
for post in db.blog.find({'tags': {'$in': ['scaling', 'cooking']}}):
print post['title'], '->', ', '.join(post['tags'])
Indexing is actually the most important part of MongoDB.
MongoDB has great support for indexing, and it supports single key, multi key, compound and hashed indexes. Each index type has its specific use case and can be used both for querying and sorting.
In case of compound indexes they can also be used when only a part of the query filter is present into the index, there is also a special case of indexes called covering indexes which happen when the fields you are asking for are all available into the index. In that case MongoDB won't even access the collection and will directly serve you the data from the index. An index cannot be both a multi key index and a covering index.
Indexes are also ordered, so they can be created ASCENDING or DESCENDING.
Creating indexes can be done using the ensure_index
method
In [6]:
db.blog.ensure_index([('tags', 1)])
Out[6]:
Checking which index MongoDB is using to perform a query can be done using the explain
method, forcing an index into a query can be done using the hint
method.
As MongoDB uses a statistical optimizer, using hint
in queries can actually provide a performance boost as it avoids the "best option" lookup cost of the optimizer.
In [14]:
db.blog.find({'tags': 'mongodb'}).explain()['queryPlanner']['winningPlan']
Out[14]:
In [15]:
db.blog.find({'tags': 'mongodb'}).hint([('_id', 1)]).explain()['queryPlanner']['winningPlan']
Out[15]:
In [16]:
db.blog.find({'title': 'Post 1'}).explain()['queryPlanner']['winningPlan']
Out[16]:
In [18]:
db.blog.ensure_index([('author.name', 1), ('title', 1)])
db.blog.find({'author.name': 'Alessandro'}, {'title': True, '_id': False}).explain()['queryPlanner']['winningPlan']
Out[18]:
The aggreation pipeline provided by the aggreation framework is a powerful feature in MongoDB that permits to perform complex data analysis by passing the documents through a pipeline of operations.
MongoDB was created with the cover philosophy that you are going to store your documents depending on the way you are going to read them. So to properly design your schema you need to know how you are going to use the documents. While this approach provides great performance benefits and is more concrete in case of web application, it might not always be feasible.
In case you need to perform some kind of analysis your documents are not optimized for, you can rely on the aggreation framework to create a pipeline that transforms them in a way more practical for the kind of analysis you need.
The aggregation pipeline is a list of operations that gets executed one after the other on the documents of the collections. The first operation will be performed on all the documents, while successive operations are performed on the result of the previous steps.
If steps are able to take advantage of indexes they will, that is the case for a match or sort operator, if it appears at the begin of the pipeline. All operators start with a $ sign
Each stage operator can work with one or more expression operator which allow to perform actions during that stage, for a list of expression operators see http://docs.mongodb.org/manual/reference/operator/aggregation/#expression-operators
Examples are based on twitter database from the same S3 bucket used in MrJob examples imported in mongodb using:
$ curl -O http://panisson-bigdive.s3.amazonaws.com/twitter/2011-02-11/2011-02-11.json.aa.gz
$ gunzip 2011-02-11.json.aa.gz
$ mongoimport --db twitter --collection tweets /Users/adrianopagano/Desktop/Big_Dive/BigDive5/Data/2011-02-11.json.aa
2015-06-21T17:18:06.908+0200 connected to: localhost
2015-06-21T17:18:09.896+0200 [#########...............] twitter.tweets 19.6 MB/50.0 MB (39.3%)
2015-06-21T17:18:12.900+0200 [###################.....] twitter.tweets 41.1 MB/50.0 MB (82.2%)
2015-06-21T17:18:13.720+0200 imported 20000 documents
In [3]:
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client.twitter
# How many professors wrote a tweet?
print len(list(db.tweets.aggregate([
{'$match': {'user.description': {'$regex': 'Professor'}}}
])))
# Count them using only the pipeline
print db.tweets.aggregate([
{'$match': {'user.description': {'$regex': 'Professor'}}},
{'$group': {'_id': 'count', 'count': {'$sum': 1}}}
]).next()['count']
# Hashtags frequency
print list(db.tweets.aggregate([
{'$project': {'tags': '$entities.hashtags.text', '_id': 0}},
{'$unwind': '$tags'},
{'$group': {'_id': '$tags', 'count': {'$sum': 1}}},
{'$match': {'count': {'$gt': 20}}}
]))
MongoDB is powered by the V8 javascript engine, this means that each mongod node is able to run javascript code. With an high enough number of mongod nodes, you actually end up with a powerful execution environment for distributed code that also copes with the major problem of data locality.
For this reason MongoDB exposes a mapreduce function which can be leveraged in shareded environments to run map reduce jobs. Note that the Aggregation Pipeline is usually faster than the mapReduce feature, and it scales with the number of nodes as mapReduce, so you should rely on MapReduce only when the algorithm cannot be efficiently expressed with the Aggregation Pipeline.
In [17]:
freqs = db.tweets.map_reduce(
map='''function() {
var tags = this.entities.hashtags;
for(var i=0; i<tags.length; i++)
emit(tags[i].text, 1);
}''',
reduce='''function(key, values) {
return Array.sum(values);
}''',
out='tagsfrequency'
)
print(list(
db.tagsfrequency.find({'value': {'$gt': 10}}).sort([('value', -1)])
))
In [18]:
print freqs
In [19]:
db.tweets.find_one()
Out[19]:
In [41]:
freqs = db.tweets.map_reduce(
map='''function() {
var tags = this.user.screen_name;
emit(tags, 1);
}''',
reduce='''function(key, values) {
return Array.sum(values);
}''',
out='namefrequency'
)
print(list(
db.namefrequency.find().sort([('value', -1)]).limit(10)
))
There are cases you might want to export the results of a mongodb query to make possible to process them into another system, this might be the case for an EMR job which has to perform operations on data stored on MongoDB.
The most simple solution to export those data in a format recognized by EMR and MrJob is using the mongoexport
tool provided with mongodb itself. The tool is able to export data in a format recognized by MrJob JSONValueProcotol
so you can upload it to S3 and directly process it from EMR.
For example, exporting all the data for the web tag, can be easily done using:
$ ./mongoexport -d phonebook -c blog -o /tmp/data.json -q '{"tags": "web"}'
This will write the data to /tmp/data.json in a format recognized by JSONValueProtocol
, full list of options can be seen using --help
, in the previous example the following options were used:
Sharding, or horizontal scaling, divides the data set and distributes the data over multiple servers, or shards. Each shard is an independent database, and collectively, the shards make up a single logical database.
Chunk The whole set of data is divided in Chunks, chunk are then distributed as equally as possible through all the nodes
Shard Key The shard key is the Document property on which chunks are decided, the range of shard key possible values is divided in chunks and each chunk is assigned to a node. Document which near values for the shard key will end up being in the same chunk and so on the same node.
Shard Each MongoDB node or ReplicaSet that contains part of the sharded data.
Router The routers is the interface to the cluster, each query and operation will be performed against the router. The router is then in charge of forwarding the operation to one or multiple shards and gather the results.
Config Server The config servers keep track of chunks distribution, they know which shard contains which chunk and which values are kept inside each chunk. Whenever the router has to perform an operation or split chunks that became too big it will read and write chunks distribution from the config servers.
To properly setup a sharded environment at least 1 mongos, 2 shards and 1 config server are required. That's the minimum requirement for a test environment and is not suitable for production usage.
First we need to create the directories for each node:
$ mkdir /tmp/mongocluster
$ mkdir /tmp/mongocluster/n0
$ mkdir /tmp/mongocluster/n1
$ mkdir /tmp/mongocluster/n2
$ mkdir /tmp/mongocluster/c0
Then we need to start the shards:
$ mongod --port 27016 --dbpath /tmp/mongocluster/n0
$ mongod --port 27015 --dbpath /tmp/mongocluster/n1
$ mongod --port 27014 --dbpath /tmp/mongocluster/n2
Then we need to start at least a config server:
$ mongod --configsvr --dbpath /tmp/mongocluster/c0 --port 27019
Now that all the required nodes are up, we can finaly start the mongos router which is in charge of actually providing the sharding functionality:
$ mongos --configdb 127.0.0.1:27019
Now all the required nodes are up and running, but we still didn't configure any sharded environment.
The first step required to setup a sharding environment is to actually add the nodes to the cluster.
To do so we need to connect to the mongos
and issue the sh.addShard
command:
$ mongo
MongoDB shell version: 3.0.4
connecting to: test
mongos> sh.addShard('127.0.0.1:27016')
{ "shardAdded" : "shard0000", "ok" : 1 }
mongos> sh.addShard('127.0.0.1:27015')
{ "shardAdded" : "shard0001", "ok" : 1 }
mongos> sh.addShard('127.0.0.1:27014')
{ "shardAdded" : "shard0002", "ok" : 1 }
```
Now that our shards have been added to the cluster we can turn on sharding for **databases** and **collections**.
Only sharded collections will actually be sharded across the nodes.
We are going to shard our collection of tweets, so the first step is to enable sharding for the ``twitter`` database:
mongos> sh.enableSharding('twitter') { "ok" : 1 }
Now we need to provide the actual sharding key for our ``tweets`` collection. Until a sharding key is provided, no sharding happens. To ensure that tweets are properly distributed across nodes we are going to shard by the screen name of the author:
mongos> sh.shardCollection("twitter.tweets", {'user.screen_name': 1}) { "collectionsharded" : "twitter.tweets", "ok" : 1 }
Now we can finally import our data and see that it gets distributed across the nodes:
$ mongoimport --db twitter --collection tweets 2011-02-11.json.aa
To check that our data has properly distributed across nodes:
mongos> use twitter switched to db twitter mongos> db.printShardingStatus() --- Sharding Status --- sharding version: { "_id" : 1, "minCompatibleVersion" : 5, "currentVersion" : 6, "clusterId" : ObjectId("558fc718712e65efc2a378d9") } shards: { "_id" : "shard0000", "host" : "localhost:27016" } { "_id" : "shard0001", "host" : "localhost:27015" } { "_id" : "shard0002", "host" : "localhost:27014" } balancer: Currently enabled: yes Currently running: no Failed balancer rounds in last 5 attempts: 0 Migration Results for the last 24 hours: 4 : Success databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : false, "primary" : "shard0002" } { "_id" : "twitter", "partitioned" : true, "primary" : "shard0002" } twitter.tweets shard key: { "user.screen_name" : 1 } chunks: shard0000 2 shard0001 2 shard0002 2 { "user.screen_name" : { "$minKey" : 1 } } -->> { "user.screen_name" : "111111121111111" } on : shard0000 Timestamp(2, 0) { "user.screen_name" : "111111121111111" } -->> { "user.screen_name" : "YohannaCS" } on : shard0001 Timestamp(3, 0) { "user.screen_name" : "YohannaCS" } -->> { "user.screen_name" : "graciadelcielo" } on : shard0000 Timestamp(4, 0) { "user.screen_name" : "graciadelcielo" } -->> { "user.screen_name" : "nosso_surita" } on : shard0001 Timestamp(5, 0) { "user.screen_name" : "nosso_surita" } -->> { "user.screen_name" : "yuuki_gei" } on : shard0002 Timestamp(5, 1) { "user.screen_name" : "yuuki_gei" } -->> { "user.screen_name" : { "$maxKey" : 1 } } on : shard0002 Timestamp(1, 6) ```
NOTE: Splitting by a date or sequential values is usually not a good idea, as you end up enforcing all the workload on the primary node that contains the most recent data.
In [19]:
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client.twitter
db.tweets.map_reduce(
map='''function() {
var tags = this.entities.hashtags;
for(var i=0; i<tags.length; i++)
emit(tags[i].text, 1);
}''',
reduce='''function(key, values) {
return Array.sum(values);
}''',
out='tagsfrequency'
)
print(list(
db.tagsfrequency.find({'value': {'$gt': 10}})
))
You will notice that the map_reduce
command has now been properly split across the nodes of our cluster. Our shards should report in their logs something like:
2015-06-28T12:31:58.231+0200 I COMMAND [conn4] command twitter.$cmd command: mapReduce { mapreduce: "tweets", map: "function() {
var tags = this.entities.hashtags;
for(var i=0; i<tags.length; i++)
emit(tags[i].text, 1);
}", reduce: "function(key, values) {
return Array.sum(values);
}", out: "tmp.mrs.tweets_1435487518_0", shardedFirstPass: true } ntoreturn:1 keyUpdates:0 writeConflicts:0 numYields:0 reslen:151 locks:{ Global: { acquireCount: { r: 2225, w: 1068, W: 3 } }, MMAPV1Journal: { acquireCount: { r: 575, w: 2130 } }, Database: { acquireCount: { r: 535, w: 1060, R: 42, W: 11 } }, Collection: { acquireCount: { R: 535, W: 1063 } }, Metadata: { acquireCount: { W: 8 } } } 102ms
To avoid Journal Overhead save the journal on a separate DISK from data, it will lower the journal overhead down to 3%.
Files can get fragmented over time if remove() and update() are issued.
PowerOf2Allocation is default on 2.6, is more efficient in case of updates/remove as each record has a size in bytes that is a power of 2 (e.g. 32, 64, 128, 256, 512...) so when updating documents they probably have not need to be moved (if document was 200bytes it will have up to 56 more bytes before needing to be reallocated) and when deleted it will leave a slot that can be reused for another document as it will match for sure the same size being rounded to powers of 2.
https://github.com/10gen-labs/storage-viz helps debugging storage, RAM and fragmentation.
op/sec * docsize + 40%
)https://github.com/rueckstiess/mtools helps debugging operations logs and slow replication
An RDD can be considered as huge list without any keys
RDD = [1,2,3]
RDD = [1,2,3]
To use Spark you need Java
Download http://blackhole.test.axantweb.com/pyspark-1.6.1.tgz
To check if it worked: expand the directory, cd into it and run ./pyspark.sh
.
If error Exception in thread "main"
use this solution:
sudo rm /usr/bin/java
sudo ln -s /Library/Internet\ Plug-Ins/JavaAppletPlugin.plugin/Contents/Home/bin/java /usr/bin
To use Spark in Jupyter:
IPYTHON_OPTS = "notebook" .pyspark.sh
In [1]:
sc
Out[1]:
In [2]:
data = sc.parallelize([1,2,3,4,5,6])
In [3]:
data.first()
Out[3]:
In [4]:
def multiply(v):
return v*2
In [7]:
multdata = data.map(multiply)
print multdata
In [13]:
multdata.collect()
Out[13]:
In [9]:
sc.defaultParallelism
Out[9]:
In [12]:
values = range(20)
print values
In [19]:
rddvalues = sc.parallelize(values)
def sumall(i):
yield list(i) # returns the list on each core (4 cores available)
rddvalues.mapPartitions(sumall).collect()
Out[19]:
In [20]:
def summap(v):
return v+1
rddvalues.map(summap).collect()
Out[20]:
In [21]:
def sumtwo(a, b):
return a+b
rddvalues.reduce(sumtwo)
Out[21]:
In [23]:
low = sc.parallelize(range(1,10))
high = sc.parallelize(range(5,15))
In [24]:
low.union(high).collect()
Out[24]:
In [25]:
low.intersection(high).collect()
Out[25]:
In [27]:
low.union(high).distinct().collect()
Out[27]:
In [56]:
text = sc.textFile('./Jungle_Book.txt')
In [34]:
print text
In [57]:
text.count()
Out[57]:
In [45]:
def splitlines(line):
return line.split()
words = text.flatMap(splitlines)
In [46]:
words.count()
Out[46]:
In [47]:
words.take(15)
Out[47]:
In [107]:
def get_freq(word):
return word, 1
def get_count(a, b): # this will be used with reduceByKey
return a+b
def switch_tuple(t): # this is needed to sort by Key (flips the passed tuple)
return t[1], t[0]
print text.flatMap(splitlines).map(get_freq).reduceByKey(get_count).map(switch_tuple).sortByKey(0, 1).collect()[:20]
In [ ]: