In [52]:
import sqlalchemy
sqlalchemy.__version__
Out[52]:
In [53]:
from sqlalchemy import create_engine
engine = create_engine('sqlite:///:memory:')
In [54]:
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
metadata = MetaData()
user = Table('user', metadata,
Column('id_user', Integer, primary_key=True),
Column('name', String),
Column('age', Integer))
item = Table('item', metadata,
Column('id_item', Integer, primary_key=True),
Column('id_user', Integer, ForeignKey('user.id_user')),
Column('thing', String))
metadata.create_all(engine)
... and let's add a couple of users. First we make a statement for the insert, and then we would execute it.
In [55]:
people = [
(1, 'Bob', '20'),
(2, 'Sally', '25'),
(3, 'John', '30')]
insert = user.insert()
print(insert)
for p in people:
stmt = insert.values(p)
print(stmt.compile().params)
... and then actually put them into the database:
In [56]:
connection = engine.connect()
for p in people:
connection.execute(insert.values(p))
... and pull the data back out.
In [57]:
from sqlalchemy import select
users = connection.execute(select([user]))
print(users)
print(list(users))
So the results are an iterator which gives up tuples.
However, we can get a list of dictionaries from the iterator:
In [58]:
users = connection.execute(select([user]))
for u in users:
i = u.items()
# print(i)
print(dict(i))
In [59]:
items = (
(1, 1, 'Peanuts'),
(2, 1, 'VW'),
(3, 1, 'iPad'),
(4, 2, 'Raisins'),
(5, 2, 'Fiat'),
(6, 2, 'Nexus 10'),
(7, 2, 'Timex'),
(8, 3, 'Caviar'),
(9, 3, 'Porche'),
(10, 3, 'Surface Pro'),
(11, 3, 'Rolex'),
(12, 3, 'Boat'),
(13, 3, 'Plane'))
insert = item.insert()
for i in items:
connection.execute(insert.values(i))
In [60]:
for x in connection.execute(select([item])):
print(x)
Now lets now join the two tables together:
In [61]:
stmt = select([user, item], use_labels=True)
for s in connection.execute(stmt):
print(s)
# print(s.items())
That's everything in both tables. But we have id_user in the second table.
Let's do a very simple filter:
In [62]:
stmt = select([user, item]).where(user.c.id_user == item.c.id_user)
print(stmt)
In [63]:
for s in connection.execute(stmt):
print(s)
In [66]:
from sqlalchemy import func
stmt = select([func.count(user.c.id_user)])
result = connection.execute(stmt)
print(tuple(result))
That's great, but we have a convenience function as well:
In [68]:
result = connection.execute(stmt).scalar()
print(result)
In [71]:
stmt = (select([user, func.count(user.c.id_user).label('item_count')])
.select_from(user.join(item))
.group_by(user.c.id_user))
print(stmt)
In [73]:
for s in connection.execute(stmt):
print(s.items())
In [76]:
stmt1 = select([item.c.id_user]).where(item.c.thing.ilike('boat'))
print(connection.execute(stmt1).fetchone())
In [77]:
stmt2 = select([user]).where(user.c.id_user.in_(stmt1.alias()))
print(stmt2)
In [78]:
print(connection.execute(stmt2).fetchone())