Welcome to the Python native driver tutorial. This tutorial will be constantly evolving as feedback is incorperated. Pull requests are encouraged! This is fully open source, feel free to fork and deliver it at other meetups!
Check out the documenation and the github repo.
We're going to assume a keyspace named tutorial exists
In a CQL shell, we can do this:
CREATE KEYSPACE if not exists tutorial WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
we'll create the rest of the tables programatically
In [ ]:
%load_ext cql
In [ ]:
%cql DROP KEYSPACE IF EXISTS tutorial;
%cql CREATE KEYSPACE IF NOT EXISTS tutorial WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
In [3]:
# here we set up our database connection
from cassandra.cluster import Cluster
cluster = Cluster(["127.0.0.1"])
# a session manages the connection pool for us
session = cluster.connect("tutorial")
print "Connected"
In [ ]:
# basic table setup
tables = ["photo", "comment"]
# drop the existing ones. can't use a placeholder for
# DROP TABLE so we just use python string interpolation
for table in tables:
session.execute("DROP TABLE if exists %s" % table)
In [8]:
photo = """
CREATE TABLE photo (
photo_id uuid,
name text,
PRIMARY KEY (photo_id)
)
"""
comment = """
CREATE TABLE comment (
photo_id uuid,
comment_id timeuuid,
comment text,
PRIMARY KEY (photo_id, comment_id)
) WITH CLUSTERING ORDER BY (comment_id DESC)
"""
session.execute(photo)
session.execute(comment)
In [9]:
import uuid
insert = session.prepare("INSERT INTO photo (photo_id, name) VALUES (?, ?)")
for x in range(100):
session.execute(insert, (uuid.uuid4(), "test %d" % x))
for photo in session.execute("SELECT * from photo limit 5"):
print photo
In [ ]:
# lets get a nice visual of some sensor data!
session.execute("DROP TABLE IF EXISTS sensor_data")
sensor_table = """
CREATE TABLE sensor_data (
sensor_id uuid,
created_at timeuuid,
reading int,
PRIMARY KEY (sensor_id, created_at)
) WITH CLUSTERING ORDER BY (created_at DESC)
"""
session.execute(sensor_table)
from uuid import uuid1, uuid4
from random import randint
insert = session.prepare("INSERT INTO sensor_data (sensor_id, created_at, reading) VALUES (?, ?, ?)")
sid = uuid4()
for x in range(100):
session.execute(insert, (sid, uuid.uuid1(), randint(1, 1000)))
The Python driver works best when taking advantage of it's asynchronous features.
First, we'll insert our sensor data. Here's we'll use a callback to insert some data after we've created the initial sensor entry.
In [10]:
from datetime import datetime
from random import randint
session.execute("TRUNCATE sensor_data")
session.execute("DROP TABLE IF EXISTS sensor")
sensor_table = """
CREATE TABLE sensor (
sensor_id uuid,
name text,
created_at timestamp,
PRIMARY KEY (sensor_id)
)
"""
session.execute(sensor_table)
"""
from earlier defined table:
sensor_data:
sensor_id uuid,
created_at timeuuid,
reading int,
"""
Out[10]:
In [13]:
insert_sensor = session.prepare("""INSERT INTO sensor (sensor_id, name, created_at)
VALUES (?, ?, ?)""")
def create_sensor_entries_callback(response, sensor_id):
print "CALLBACK"
for x in range(10):
sensor_data = (uuid.uuid4(), "sensor %d" % x, datetime.now())
future = session.execute_async(insert_sensor, sensor_data)
future.add_callback(create_sensor_entries_callback, sensor_id)
In [12]:
insert_sensor = session.prepare("""INSERT INTO sensor (sensor_id, name, created_at)
VALUES (?, ?, ?)""")
insert_sensor_data = session.prepare("""INSERT INTO sensor_data (sensor_id, created_at, reading)
VALUES (?, ?, ?)""")
# CALLBACK: for each sensor we're going to create 100 sensor entires
def create_sensor_entries_callback(response, sensor_id):
print "CALLBACK"
for x in range(10):
session.execute_async(insert_sensor_data, (sensor_id, uuid1(), randint(1, 1000)))
futures = []
sensor_ids = []
for x in range(10):
sensor_id = uuid.uuid4()
print sensor_id
future = session.execute_async(insert_sensor, (sensor_id, "sensor %d" % x, datetime.now()))
future.add_callback(create_sensor_entries_callback, sensor_id)
futures.append(future)
sensor_ids.append([sensor_id]) # we'll save this for later as a list of tuples
In [ ]:
print "This is to clear the rest of the callbacks (ipython notebook issue)"
In [ ]:
from cassandra.concurrent import execute_concurrent_with_args
select_statement = session.prepare("""SELECT * FROM sensor_data WHERE sensor_id=?
ORDER BY created_at DESC LIMIT 1""")
print "Sensor IDS:", sensor_ids
result = execute_concurrent_with_args(session, select_statement, sensor_ids)
for x in result:
print "result:", x
In [ ]:
from cassandra.query import SimpleStatement
statement = SimpleStatement("SELECT * from sensor_data WHERE sensor_id=%s LIMIT 1")
result = session.execute(statement, sensor_ids[0], trace=True)
for event in statement.trace.events:
print event.source_elapsed, event.description
The native driver will manage the connection pool for you, but by default it uses a RoundRobinPolicy to pick which server it talks to. In your cluster, this means it'll talk to every machine in every datacenter. This is ok if you only have 1 cluster but most of the time you're going to want to only talk to machines in the datacenter you're in. For futher reading check out the load balancing policies section of the native driver docs.
In [ ]:
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
# load local_dc string from environment or config in a real codebase
policy = TokenAwarePolicy(DCAwareRoundRobinPolicy(local_dc='US_EAST'))
token_aware_cluster = Cluster(['127.0.0.1'], load_balancing_policy=policy)
In [ ]:
user_table = """
CREATE TABLE user (
user_id int primary key,
name text
)
"""
session.execute("DROP TABLE IF EXISTS user")
session.execute(user_table)
print "User table created: " , user_table
In [ ]:
insert = "INSERT INTO user (user_id, name) values (1, 'jon') IF NOT EXISTS"
# what if this ran a millisecond after we just put steve in the system?
insert2 = "INSERT INTO user (user_id, name) values (1, 'steve') IF NOT EXISTS"
print "first query: " , session.execute(insert)
print "second query: " , session.execute(insert2)
In [ ]:
update_lwt = "UPDATE user SET name = 'steve' where user_id = 1 IF name = 'jon'"
print "update result: ", session.execute(update_lwt)
update_lwt = "UPDATE user SET name = 'joe' where user_id = 1 IF name = 'jon'"
print "update result: ", session.execute(update_lwt)
User defined types allow you to create complex data structures in Cassandra.
In [ ]:
session.execute("DROP TABLE IF EXISTS users")
session.execute("DROP TYPE IF EXISTS address")
session.execute("CREATE TYPE tutorial.address (street text, zipcode int)")
session.execute("CREATE TABLE users (id int PRIMARY KEY, location frozen<address>)")
cluster = Cluster() # dirty fix for issue with ipython & async
session = cluster.connect("tutorial")
# create a class to map to the "address" UDT
class Address(object):
def __init__(self, street, zipcode):
self.street = street
self.zipcode = zipcode
print cluster.metadata.keyspaces['tutorial']
cluster.register_user_type('tutorial', 'address', Address)
# insert a row using an instance of Address
session.execute("INSERT INTO users (id, location) VALUES (%s, %s)",
(0, Address("123 Main St.", 78723)))
# results will include Address instances
results = session.execute("SELECT * FROM users")
row = results[0]
print row.id, row.location.street, row.location.zipcode
In [ ]:
session.execute("CREATE TABLE geo_location (user_id uuid, ts timeuuid, geo frozen<tuple<float,float>>, primary key (user_id, ts))")
In [ ]:
user_id = uuid4()
ts = uuid1()
session.execute("TRUNCATE geo_location")
session.execute("""INSERT INTO geo_location (user_id, ts, geo)
VALUES (%s, %s, (%s, %s))""",
(user_id, ts, 1.0, 2.0))
print session.execute("SELECT * from geo_location")[0]
In [ ]: