In [1]:
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import relationship, backref
# connect to database
engine = create_engine('sqlite:///:memory:', echo = False)
# create base class for ORM
Base = declarative_base()
# create database schema
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key = True)
name = Column(String)
fullname = Column(String)
password = Column(String)
def __repr__(self):
return '<User (id = {0}, name = {1}, fullname = {2}, passowrd = {3})>'.format(self.id, self.name, self.fullname, self.password)
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key = True)
email_address = Column(String, nullable = False)
# `relationship()` 는 ORM 에게 `Address.user` 를 통해서 `User` 클래스와 연결하게 만드는데,
# 이때 두 테이블간의 연결은 `ForeignKey` 관계를 이용한다.
#
# `relationship( ... backref = backref(''))` 는 거꾸로의 연결을 생성한다.
# `User.addresses` 를 통해서 `Address.id` 로 정렬된 `Address` 클래스로의 연결을 찾을 수 있다.
user_id = Column(Integer, ForeignKey('users.id'))
user = relationship('User', backref = backref('addresses', order_by = id))
def __repr__(self):
return '<Address (id = {0}, email_address = {1})>'.format(self.id, self.email_address)
# create table
Base.metadata.create_all(engine)
# create ORM's handle to database called `session`
Session = sessionmaker(bind=engine)
session = Session()
In [2]:
# add instances
ed_user = User(name = 'ed', fullname = 'ed jones', password = 'ed_passwd')
jack = User(name = 'jack', fullname = 'jack bean', password = 'jack_password')
print ed_user.addresses
print jack.addresses
jack.addresses = \
[
Address(email_address = 'abc@jack.com'),
Address(email_address = 'jack@jack.com')
]
print jack.addresses[0]
print jack.addresses[1]
# insert into database
session.add_all([ed_user, jack])
session.commit()
In [3]:
# select `addresses` table
for address in session.query(Address).all():
print address
# select all
for user in session.query(User).all():
print 'user = {0}'.format(user)
for email_address in user.addresses:
print '... email_address = {0}'.format(email_address)
In [4]:
for u, a in session.query(User, Address).filter(User.id == Address.user_id).filter(Address.email_address == 'jack@jack.com').all():
print '> {0}'.format(u)
print '> {0}'.format(a)
# more explicit way
# no need to specify `User.id == Address.user_id`, because there are only one `ForeignKey` between them.
print '> {0}'.format( session.query(User).join(Address).filter(Address.email_address == 'jack@jack.com').all() )
# if more than one ForeignKey or No `foreignkey` exists, use following form.
print '> {0}'.format( session.query(User).join(Address, User.id == Address.user_id).filter(Address.email_address == 'jack@jack.com').all() )
print '> {0}'.format( session.query(User).join(User.addresses).filter(Address.email_address == 'jack@jack.com').all() )
print '> {0}'.format( session.query(User).join(Address, User.addresses).filter(Address.email_address == 'jack@jack.com').all() )
print '> {0}'.format( session.query(User).join('addresses').filter(Address.email_address == 'jack@jack.com').all() )
# outerjoin() - LEFT OUTER JOIN
for u in session.query(User).outerjoin(User.addresses).all():
print '> {0}'.format(u)
In [ ]:
In [5]:
print 'before count = {0}'.format( session.query(func.count(User.id)).scalar() )
session.query(User).filter(User.name == 'jack').delete()
session.expire_all()
#session.commit()
print 'after count = {0}'.format( session.query(func.count(User.id)).scalar() )