SQL Alchemy Core Examples

This file contains SQLAlchemy core examples.

Test we have SQL Alchemy


In [52]:
import sqlalchemy
sqlalchemy.__version__


Out[52]:
'0.9.9'

Fetch an SQLite engine and create an in memory database


In [53]:
from sqlalchemy import create_engine
engine = create_engine('sqlite:///:memory:')

Now lets make a couple of tables and do some queries


In [54]:
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
metadata = MetaData()
user = Table('user', metadata,
    Column('id_user', Integer, primary_key=True),
    Column('name', String),
    Column('age', Integer))
item = Table('item', metadata,
    Column('id_item', Integer, primary_key=True),
    Column('id_user', Integer, ForeignKey('user.id_user')),
    Column('thing', String))

metadata.create_all(engine)

... and let's add a couple of users. First we make a statement for the insert, and then we would execute it.


In [55]:
people = [
    (1, 'Bob', '20'),
    (2, 'Sally', '25'),
    (3, 'John', '30')]
insert = user.insert()
print(insert)
for p in people:
    stmt = insert.values(p)
    print(stmt.compile().params)


INSERT INTO "user" (id_user, name, age) VALUES (:id_user, :name, :age)
{'age': '20', 'id_user': 1, 'name': 'Bob'}
{'age': '25', 'id_user': 2, 'name': 'Sally'}
{'age': '30', 'id_user': 3, 'name': 'John'}

... and then actually put them into the database:


In [56]:
connection = engine.connect()
for p in people:
    connection.execute(insert.values(p))

... and pull the data back out.


In [57]:
from sqlalchemy import select
users = connection.execute(select([user]))
print(users)
print(list(users))


<sqlalchemy.engine.result.ResultProxy object at 0x7ffcb430ca20>
[(1, 'Bob', 20), (2, 'Sally', 25), (3, 'John', 30)]

So the results are an iterator which gives up tuples.

However, we can get a list of dictionaries from the iterator:


In [58]:
users = connection.execute(select([user]))
for u in users:
    i = u.items()
#     print(i)
    print(dict(i))


{'age': 20, 'id_user': 1, 'name': 'Bob'}
{'age': 25, 'id_user': 2, 'name': 'Sally'}
{'age': 30, 'id_user': 3, 'name': 'John'}

Lets add some item data


In [59]:
items = (
    (1, 1, 'Peanuts'),
    (2, 1, 'VW'),
    (3, 1, 'iPad'),
    (4, 2, 'Raisins'),
    (5, 2, 'Fiat'),
    (6, 2, 'Nexus 10'),
    (7, 2, 'Timex'),
    (8, 3, 'Caviar'),
    (9, 3, 'Porche'),
    (10, 3, 'Surface Pro'),
    (11, 3, 'Rolex'),
    (12, 3, 'Boat'),
    (13, 3, 'Plane'))
insert = item.insert()
for i in items:
    connection.execute(insert.values(i))

In [60]:
for x in connection.execute(select([item])):
    print(x)


(1, 1, 'Peanuts')
(2, 1, 'VW')
(3, 1, 'iPad')
(4, 2, 'Raisins')
(5, 2, 'Fiat')
(6, 2, 'Nexus 10')
(7, 2, 'Timex')
(8, 3, 'Caviar')
(9, 3, 'Porche')
(10, 3, 'Surface Pro')
(11, 3, 'Rolex')
(12, 3, 'Boat')
(13, 3, 'Plane')

Now lets now join the two tables together:


In [61]:
stmt = select([user, item], use_labels=True)
for s in connection.execute(stmt):
    print(s)
#     print(s.items())


(1, 'Bob', 20, 1, 1, 'Peanuts')
(1, 'Bob', 20, 2, 1, 'VW')
(1, 'Bob', 20, 3, 1, 'iPad')
(1, 'Bob', 20, 4, 2, 'Raisins')
(1, 'Bob', 20, 5, 2, 'Fiat')
(1, 'Bob', 20, 6, 2, 'Nexus 10')
(1, 'Bob', 20, 7, 2, 'Timex')
(1, 'Bob', 20, 8, 3, 'Caviar')
(1, 'Bob', 20, 9, 3, 'Porche')
(1, 'Bob', 20, 10, 3, 'Surface Pro')
(1, 'Bob', 20, 11, 3, 'Rolex')
(1, 'Bob', 20, 12, 3, 'Boat')
(1, 'Bob', 20, 13, 3, 'Plane')
(2, 'Sally', 25, 1, 1, 'Peanuts')
(2, 'Sally', 25, 2, 1, 'VW')
(2, 'Sally', 25, 3, 1, 'iPad')
(2, 'Sally', 25, 4, 2, 'Raisins')
(2, 'Sally', 25, 5, 2, 'Fiat')
(2, 'Sally', 25, 6, 2, 'Nexus 10')
(2, 'Sally', 25, 7, 2, 'Timex')
(2, 'Sally', 25, 8, 3, 'Caviar')
(2, 'Sally', 25, 9, 3, 'Porche')
(2, 'Sally', 25, 10, 3, 'Surface Pro')
(2, 'Sally', 25, 11, 3, 'Rolex')
(2, 'Sally', 25, 12, 3, 'Boat')
(2, 'Sally', 25, 13, 3, 'Plane')
(3, 'John', 30, 1, 1, 'Peanuts')
(3, 'John', 30, 2, 1, 'VW')
(3, 'John', 30, 3, 1, 'iPad')
(3, 'John', 30, 4, 2, 'Raisins')
(3, 'John', 30, 5, 2, 'Fiat')
(3, 'John', 30, 6, 2, 'Nexus 10')
(3, 'John', 30, 7, 2, 'Timex')
(3, 'John', 30, 8, 3, 'Caviar')
(3, 'John', 30, 9, 3, 'Porche')
(3, 'John', 30, 10, 3, 'Surface Pro')
(3, 'John', 30, 11, 3, 'Rolex')
(3, 'John', 30, 12, 3, 'Boat')
(3, 'John', 30, 13, 3, 'Plane')

That's everything in both tables. But we have id_user in the second table.

Let's do a very simple filter:


In [62]:
stmt = select([user, item]).where(user.c.id_user == item.c.id_user)
print(stmt)


SELECT "user".id_user, "user".name, "user".age, item.id_item, item.id_user, item.thing 
FROM "user", item 
WHERE "user".id_user = item.id_user

In [63]:
for s in connection.execute(stmt):
    print(s)


(1, 'Bob', 20, 1, 1, 'Peanuts')
(1, 'Bob', 20, 2, 1, 'VW')
(1, 'Bob', 20, 3, 1, 'iPad')
(2, 'Sally', 25, 4, 2, 'Raisins')
(2, 'Sally', 25, 5, 2, 'Fiat')
(2, 'Sally', 25, 6, 2, 'Nexus 10')
(2, 'Sally', 25, 7, 2, 'Timex')
(3, 'John', 30, 8, 3, 'Caviar')
(3, 'John', 30, 9, 3, 'Porche')
(3, 'John', 30, 10, 3, 'Surface Pro')
(3, 'John', 30, 11, 3, 'Rolex')
(3, 'John', 30, 12, 3, 'Boat')
(3, 'John', 30, 13, 3, 'Plane')

Functions?


In [66]:
from sqlalchemy import func
stmt = select([func.count(user.c.id_user)])
result = connection.execute(stmt)
print(tuple(result))


((3,),)

That's great, but we have a convenience function as well:


In [68]:
result = connection.execute(stmt).scalar()
print(result)


3

Group by


In [71]:
stmt = (select([user, func.count(user.c.id_user).label('item_count')])
        .select_from(user.join(item))
        .group_by(user.c.id_user))
print(stmt)


SELECT "user".id_user, "user".name, "user".age, count("user".id_user) AS item_count 
FROM "user" JOIN item ON "user".id_user = item.id_user GROUP BY "user".id_user

In [73]:
for s in connection.execute(stmt):
    print(s.items())


[('id_user', 1), ('name', 'Bob'), ('age', 20), ('item_count', 3)]
[('id_user', 2), ('name', 'Sally'), ('age', 25), ('item_count', 4)]
[('id_user', 3), ('name', 'John'), ('age', 30), ('item_count', 6)]

A final 'fun' query


In [76]:
stmt1 = select([item.c.id_user]).where(item.c.thing.ilike('boat'))
print(connection.execute(stmt1).fetchone())


(3,)

In [77]:
stmt2 = select([user]).where(user.c.id_user.in_(stmt1.alias()))
print(stmt2)


SELECT "user".id_user, "user".name, "user".age 
FROM "user" 
WHERE "user".id_user IN (SELECT item.id_user 
FROM item 
WHERE lower(item.thing) LIKE lower(:thing_1))

In [78]:
print(connection.execute(stmt2).fetchone())


(3, 'John', 30)

Okay, enough of Core - now on to the ORM