In [1]:
import sqlalchemy
sqlalchemy.__version__
Out[1]:
In [2]:
from sqlalchemy import create_engine
engine = create_engine('sqlite:///:memory:', echo=True)
In [4]:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
Base = declarative_base()
print Base
In [10]:
class UserA(Base):
__tablename__ = 'users_a'
id = Column(Integer, primary_key=True)
name = Column(String)
fullname = Column(String)
password = Column(String)
def __repr__(self):
return "<UserA(name='{0}', fullname='{1}', password='{3}')>".format(self.name, self.fullname, self.password)
In [9]:
UserA.__table__
Out[9]:
Table 객체는 많은 MetaData 객체로 이루어지는데, 신경 쓸 필요없다.
Table(
'users', MetaData(bind=None),
Column('id', Integer(), table=<users>, primary_key=True, nullable=False),
Column('name', String(), table=<users>),
Column('fullname', String(), table=<users>),
Column('password', String(), table=<users>),
schema=None)
Base.metadata.create_all( engine )
을 호출해서 실제로 users
테이블을 생성한다.String(250)
형태로 써주면 VARCHAR 에 length 를 지정할 수 있음
In [5]:
class User(Base):
__tablename__ = 'users'
__table_args__ = {'extend_existing':True} # 이미 users 테이블이 존재하는 경우 덮어씀
id = Column(Integer, primary_key=True)
name = Column(String(50))
fullname = Column(String(255))
password = Column(String(255))
def __repr__(self):
return "<User(id={3}, name='{0}', fullname='{1}', password='{2}')>".format(self.name, self.fullname, self.password, self.id)
# `users` table 을 실제로 생성한다.
Base.metadata.create_all(engine)
# `users`, `user_a` 테이블을 보려면
User.__table__
Out[5]:
In [6]:
ed_user = User(name="ed", fullname="Ed jones", password="EdsPasswordz")
print ed_user
print ed_user.id # sqlalchemy sets default value
In [7]:
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
session = Session() # session 인스턴스 생성
In [8]:
ed_user = User(name='ed', fullname='ed jones', password='edspasswordz')
session.add(ed_user)
our_user = session.query(User).filter_by(name='ed').first()
print our_user, our_user.id
User
객체를 insert
In [8]:
session.add_all([
User(name='somma', fullname='yonghwan, noh', password='passwordzzzzzz'),
User(name='somma1', fullname='yonghwan, noh1', password='passwordzzzzzz1'),
User(name='somma2', fullname='yonghwan, noh2', password='passwordzzzzzz2'),
User(name='somma3', fullname='yonghwan, noh3', password='passwordzzzzzz3'),
User(name='somma4', fullname='yonghwan, noh4', password='passwordzzzzzz4'),
User(name='somma5', fullname='yonghwan, noh5', password='passwordzzzzzz5'),
User(name='somma6', fullname='yonghwan, noh6', password='passwordzzzzzz6'),
])
In [9]:
print ed_user
ed_user.password = 'changed passwordzz'
print ed_user
session.dirty # identity map 을 통해 변경이 일어난 객체를 보여주고
session.new # 추가된 객체들...
session.commit() # db 에 쓴다.
ed_user.id
Out[9]:
In [9]:
# modify ed_user's name
print ed_user.name
ed_user.name = 'not ed jones'
# add erroneous user, `fake_user`
fake_user = User(name="fakeuser", fullname="invalid", password="abcde")
session.add(fake_user)
# query
session.query(User).filter(User.name.in_(['not ed jones', 'fakeuser'])).all()
Out[9]:
In [10]:
# rollback
session.rollback()
print session.query(User).filter(User.name.in_(['not ed jones', 'fakeuser'])).all()
print ed_user.name
In [10]:
for instance in session.query(User).order_by(User.id):
print instance.name, instance.fullname
for name, fullname in session.query(User.name, User.fullname):
print name, fullname
for row in session.query(User, User.name).all():
print row.User, row.name
for row in session.query(User.name.label('name_label')).all():
print row.name_label
In [11]:
# alias
from sqlalchemy.orm import aliased
user_alias = aliased(User, name='user_alias')
for row in session.query(user_alias, user_alias.name).all():
print row.user_alias
# limit
for u in session.query(User).order_by(User.id)[1:3]:
print u
In [11]:
for name in session.query(User.name).filter_by(fullname='ed jones'):
print name[0], type(name)
for name, in session.query(User.name).filter_by(fullname='ed jones'):
print name
for name, in session.query(User.name).filter(User.fullname=='ed jones'):
print name
for name, in session.query(User.name).filter(User.name == 'ed').\
filter(User.fullname == 'ed jones'):
print name
In [12]:
# is null / is not null
print session.query(User).filter(User.name != None).first()
print session.query(User).filter(User.name.is_(None)).first()
print session.query(User).filter(User.name.isnot(None)).first()
# not in
print session.query(User).filter(~User.name.in_(['ed', 'somma'])).first()
# in
print session.query(User).filter(User.name.in_(['ed', 'somma'])).first()
# like
print session.query(User).filter(User.name.like('%somma%')).first()
# not equals
print session.query(User).filter(User.name != 'ed').first()
# equals
print session.query(User).filter(User.name == 'ed').first()
In [55]:
# and
from sqlalchemy import and_
print session.query(User).filter(and_(User.name == 'ed', User.fullname == 'ed jones')).first()
# send multiple expression to .filter()
print session.query(User).filter(User.name == 'ed', User.fullname == 'ed jones').first()
# or chain multiple filter()/filter_by() calls
print session.query(User).filter(User.name == 'ed').filter(User.fullname == 'ed jones').first()
In [58]:
# or
from sqlalchemy import or_
print session.query(User).filter(or_(User.name == 'ed', User.fullname == 'ed jones' )).first()
In [73]:
# all()
query = session.query(User).filter(User.name.like('somm%')).order_by(User.id)
for row in query.all():
print row
# first()
print query.first()
In [89]:
# one()
from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound
# if no result
try:
query = session.query(User).filter(User.name == 'no_name').order_by(User.id)
users = query.one()
except NoResultFound as e:
print 'exception = {0}'.format(e.message)
# if multiple results
try:
query = session.query(User).filter(User.name.like('somma%')).order_by(User.id)
users = query.one()
print users
except MultipleResultsFound as e:
print 'exception = {0}'.format(e.message)
# scalar()
# return first element of first result or None if no result present.
# if multiple result returned, `MultipleResultsFound` exception raised.
try:
query = session.query(User).filter(User.name.like('somma%')).order_by(User.id)
users = query.scalar()
except MultipleResultsFound as e:
print 'exception = {0}'.format(e.message)
In [13]:
from sqlalchemy import text
for user in session.query(User).filter(text('id < 3')).order_by(text('id')).all():
print user.name
for user in session.query(User).filter(text('id < :id and name = :name')).params(id = 3, name = 'somma').all():
print user.name
for users in session.query(User).from_statement(text('select * from users where name=:name')).params(name='ed').all():
print users
for id, name, third_ret in session.query('id', 'name', 'the_number_12')\
.from_statement(text('select id, name, 12 as the_number_12 from users where name like :name'))\
.params(name='somma%').all():
print '{0}, {1}, {2}'.format(id, name, third_ret)
In [104]:
print session.query(User).count()
print session.query(User).filter(User.name.like('somma%')).count()
In [108]:
from sqlalchemy import func
session.query(func.count(User.name), User.name).group_by(User.name).all()
# select count(*) from users
session.query(func.count('*')).select_from(User).scalar()
# `select_from()` can be removed if express the count of User's primary key.
session.query(func.count(User.id)).scalar()
Out[108]:
In [46]:
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship, backref
class Address(Base):
__tablename__ = 'addresses'
#__table_args__ = {'extend_existing':True} # 이미 users 테이블이 존재하는 경우 덮어씀
id = Column(Integer, primary_key = True)
email_address = Column(String, nullable = False)
user_id = Column(Integer, ForeignKey('users.id'))
user = relationship('User', backref = backref('addresses', order_by = id))
def __repr__(self):
return '<Address (email_address = {0})>'.format(self.email_address)
# create table
Base.metadata.create_all(engine)