In [116]:
import sqlalchemy
from sqlalchemy import func
from sqlalchemy import text
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String
from sqlalchemy import ForeignKey
from sqlalchemy import Table, Text
from sqlalchemy.sql import exists
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from sqlalchemy.orm import aliased
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import contains_eager
from sqlalchemy.ext.declarative import declarative_base
## Version Check
sqlalchemy.__version__
Out[116]:
In [117]:
engine = create_engine('sqlite:///:memory:', echo=True)
In [118]:
Base = declarative_base()
In [119]:
Session = sessionmaker(bind=engine)
In [120]:
session = Session()
In [6]:
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
fullname = Column(String)
nickname = Column(String)
def __repr__(self):
return "<User(name='{}', fullname='{}', nickname='{}')>".format(
self.name, self.fullname, self.nickname)
In [7]:
User.__table__
Out[7]:
In [8]:
Base.metadata.create_all(engine)
In [9]:
ed_user = User(name='ed', fullname='Ed Jones', nickname='ednickname')
In [10]:
session.add(ed_user)
In [11]:
our_user = session.query(User).filter_by(name='ed').first()
In [12]:
ed_user is our_user
Out[12]:
In [13]:
session.add_all([
User(name='wendy', fullname='Wendy Williams', nickname='windy'),
User(name='mary', fullname='Mary Contrary', nickname='mary'),
User(name='fred', fullname='Fred Flintstone', nickname='freddy')
])
In [14]:
ed_user.nickname = 'eddie'
In [15]:
session.dirty
Out[15]:
In [16]:
session.new
Out[16]:
In [17]:
session.commit()
In [18]:
ed_user.id
Out[18]:
In [19]:
ed_user.name = 'Edwardo'
In [20]:
fake_user = User(name='fakeuser', fullname='Invalid', nickname='12345')
session.add(fake_user)
In [21]:
session.query(User).filter(User.name.in_(['Edwardo', 'fakeuser'])).all()
Out[21]:
In [22]:
session.rollback()
In [23]:
ed_user.name
Out[23]:
In [24]:
fake_user in session
Out[24]:
In [25]:
session.query(User).filter(User.name.in_(['ed', 'fakeuser'])).all()
Out[25]:
In [26]:
for instance in session.query(User).order_by(User.id):
print(instance.name, instance.fullname)
In [27]:
for name, fullname in session.query(User.name, User.fullname):
print(name, fullname)
In [28]:
for row in session.query(User, User.name).all():
print(row.User, row.name)
In [29]:
for row in session.query(User.name.label('name_label')).all():
print(row.name_label)
In [30]:
user_alias = aliased(User, name='user_alias')
In [31]:
for row in session.query(user_alias, user_alias.name).all():
print(row.user_alias)
In [32]:
for u in session.query(User).order_by(User.id)[1:3]:
print(u)
In [33]:
for name, in session.query(User.name).filter_by(fullname='Ed Jones'):
print(name)
In [34]:
for name, in session.query(User.name).filter(User.fullname=='Ed Jones'):
print(name)
In [35]:
for user in session.query(User).filter(User.name=='ed').filter(User.fullname=='Ed Jones'):
print(user)
In [36]:
query = session.query(User).filter(User.name.like('%ed')).order_by(User.id)
query.all()
Out[36]:
In [37]:
query.first()
Out[37]:
In [38]:
# ERROR
# user = query.one()
In [39]:
query = session.query(User.id).filter(User.name == 'ed').order_by(User.id)
In [40]:
query.scalar()
Out[40]:
In [41]:
for user in session.query(User).filter(text("id < 10")).order_by(text("id")).all():
print(user.name)
In [42]:
session.query(User).filter(text("id < :value and name = :name")).params(value = 10, name = 'fred').order_by(User.id).one()
Out[42]:
In [43]:
session.query(User).from_statement(text(
"SELECT * FROM users WHERE name = :name"
)).params(name = 'ed').all()
Out[43]:
In [44]:
stmt = text("SELECT name, id, fullname, nickname FROM users WHERE name = :name")
stmt = stmt.columns(User.name, User.id, User.fullname, User.nickname)
session.query(User).from_statement(stmt).params(name='ed').all()
Out[44]:
In [45]:
stmt = text("SELECT name, id FROM users WHERE name = :name")
stmt = stmt.columns(User.name, User.id)
session.query(User.id, User.name).from_statement(stmt).params(name='ed').all()
Out[45]:
In [46]:
session.query(User).filter(User.name.like('%ed')).count()
Out[46]:
In [47]:
session.query(func.count(User.name), User.name).group_by(User.name).all()
Out[47]:
In [48]:
session.query(func.count('*')).select_from(User).scalar()
Out[48]:
In [49]:
session.query(func.count(User.id)).scalar()
Out[49]:
In [50]:
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
email_address = Column(String, nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
user = relationship("User", back_populates="addresses")
def __repr__(self):
return "<Address(email_address={})>".format(self.email_address)
In [51]:
User.addresses = relationship("Address", order_by=Address.id, back_populates="user")
In [52]:
Base.metadata.create_all(engine)
In [53]:
jack = User(name='jack', fullname='Jack Bean', nickname='jacknick')
jack.addresses
Out[53]:
In [54]:
jack.addresses = [Address(email_address='jack@gmail.com'),
Address(email_address='jack@yahoo.com')]
In [55]:
jack.addresses[1]
Out[55]:
In [56]:
jack.addresses[1].user
Out[56]:
In [57]:
session.add(jack)
session.commit()
In [58]:
jack = session.query(User).filter_by(name='jack').one()
jack
Out[58]:
In [59]:
jack.addresses
Out[59]:
In [60]:
for u, a in session.query(User, Address).\
filter(User.id == Address.user_id).filter(Address.email_address == 'jack@gmail.com').all():
print(u)
print(a)
In [61]:
session.query(User).join(Address).filter(Address.email_address == 'jack@gmail.com').all()
Out[61]:
In [62]:
adalias1 = aliased(Address)
adalias2 = aliased(Address)
In [63]:
for username, email1, email2 in session.query(User.name, adalias1.email_address, adalias2.email_address).\
join(adalias1, User.addresses).join(adalias2, User.addresses).\
filter(adalias1.email_address == 'jack@gmail.com').\
filter(adalias2.email_address == 'jack@yahoo.com'):
print(username, email1, email2)
In [64]:
stmt = session.query(Address.user_id, func.count('*').label('address_count')).group_by(Address.user_id).subquery()
In [65]:
for u, count in session.query(User, stmt.c.address_count).\
outerjoin(stmt, User.id == stmt.c.address_count).order_by(User.id):
print(u, count)
In [66]:
stmt = session.query(Address).filter(Address.email_address != 'jack@yahoo.com').subquery()
In [67]:
adalias = aliased(Address, stmt)
In [68]:
for user, address in session.query(User, adalias).join(adalias, User.addresses):
print(user)
print(address)
In [69]:
stmt = exists().where(Address.user_id == User.id)
In [70]:
for name, in session.query(User.name).filter(stmt):
print(name)
In [71]:
for name, in session.query(User.name).filter(User.addresses.any()):
print(name)
In [72]:
for name, in session.query(User.name).filter(User.addresses.any(Address.email_address.like('%gmail%'))):
print(name)
In [73]:
session.query(Address).filter(~Address.user.has(User.name == 'jack')).all()
Out[73]:
In [74]:
jack = session.query(User).options(selectinload(User.addresses)).filter_by(name='jack').one()
In [75]:
jack
Out[75]:
In [76]:
jack.addresses
Out[76]:
In [77]:
jack = session.query(User).options(joinedload(User.addresses)).filter_by(name='jack').one()
In [78]:
jack
Out[78]:
In [79]:
jack.addresses
Out[79]:
In [80]:
jack_addresses = session.query(Address).\
join(Address.user).filter(User.name=='jack').\
options(contains_eager(Address.user)).all()
In [81]:
jack_addresses
Out[81]:
In [82]:
jack_addresses[0].user
Out[82]:
In [83]:
session.delete(jack)
session.query(User).filter_by(name='jack').count()
Out[83]:
In [84]:
session.query(Address).filter(Address.email_address.in_(['jack@gmail.com', 'jack@yahoo.com'])).count()
Out[84]:
In [85]:
session.close()
In [121]:
Base = declarative_base()
In [122]:
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
fullname = Column(String)
nickname = Column(String)
addresses = relationship('Address', back_populates='user', cascade='all, delete, delete-orphan')
def __repr__(self):
return "<User(name='{}', fullname='{}', nickname='{}')>".format(self.name, self.fullname, self.nickname)
In [123]:
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
email_address = Column(String, nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
user = relationship("User", back_populates="addresses")
def __repr__(self):
return "<Address(email_address='{}')>".format(self.email_address)
In [124]:
class BlogPost(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'))
headline = Column(String(255), nullable=False)
body = Column(Text)
author = relationship(User, back_populates='posts')
# many-to-many BlogPost<-->Keyword
keywords = relationship('Keyword', secondary=post_keywords, back_populates='posts')
def __init__(self, headline, body, author):
self.author = author
self.headline = headline
self.body = body
def __repr__(self):
return "BlogPost({}, {}, {})".format(self.headline, self.body, self.author)
In [125]:
User.posts = relationship(BlogPost, back_populates="author", lazy="dynamic")
In [126]:
class Keyword(Base):
__tablename__ = 'keywords'
id = Column(Integer, primary_key=True)
keyword = Column(String(50), nullable=False, unique=True)
posts = relationship('BlogPost', secondary=post_keywords, back_populates='keywords')
def __init__(self, keyword):
self.keyword = keyword
In [127]:
# association table
post_keywords = Table('post_keywords', Base.metadata,
Column('post_id', ForeignKey('posts.id'), primary_key=True),
Column('keyword_id', ForeignKey('keywords.id'), primary_key=True)
)
In [128]:
Base.metadata.create_all(engine)
In [ ]: