In [1]:
from sqlalchemy import create_engine, text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker, aliased, relationship
engine = create_engine('mysql+pymysql://test:test@localhost/test')
Base = declarative_base()
Session = sessionmaker(bind=engine)
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(32))
fullname = Column(String(32))
password = Column(String(32))
address = relationship("Address", order_by="Address.id", back_populates="user")
def __repr__(self):
return "<User(name='{}', fullname='{}', password='{}')".format(
self.name, self.fullname, self.password)
class Address(Base):
__tablename__ = 'address'
id = Column(Integer, primary_key=True)
email_address = Column(String(64), nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
user = relationship("User", back_populates="address")
def __repr__(self):
return "<Address(email_address='%s')>" % self.email_address
# 这里的创建语句的含义是,如果表已经存在的话,就不会再创建了
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
In [2]:
session = Session()
In [3]:
jack = User(name='jack', fullname='Jack Bean', password='gjffdd')
In [4]:
jack.address
Out[4]:
In [5]:
jack.address = [
Address(email_address='jack@gmail.com'),
Address(email_address='jack@qq.com')
]
In [6]:
jack.address[0]
Out[6]:
In [7]:
jack.address[1].user
Out[7]:
In [8]:
session.add(jack)
session.commit()
In [9]:
jack = session.query(User).\
filter_by(name='jack').first()
In [10]:
jack
Out[10]:
In [11]:
jack.address
Out[11]:
In [12]:
for u, a in session.query(User, Address).\
filter(User.id == Address.user_id).\
filter(Address.email_address == "jack@gmail.com").\
all():
print(u)
print(a)
In [13]:
users = session.query(User).join(Address).\
filter(Address.email_address == "jack@gmail.com").\
all();
In [14]:
users
Out[14]:
In [15]:
from sqlalchemy.orm import aliased
adalias1 = aliased(Address)
adalias2 = aliased(Address)
for username, email1, email2 in \
session.query(User.name, adalias1.email_address, adalias2.email_address).\
join(adalias1, User.address).\
join(adalias2, User.address).\
filter(adalias1.email_address=='jack@gmail.com').\
filter(adalias2.email_address=='jack@qq.com'):
print(username, email1, email2)
In [16]:
from sqlalchemy.sql import func
stmt = session.query(Address.user_id, func.count('*').\
label('address_count')).\
group_by(Address.user_id).subquery()
In [17]:
for u, count in session.query(User, stmt.c.address_count).\
outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id):
print(u, count)
In [18]:
stmt = session.query(Address).\
filter(Address.email_address != 'j25@yahoo.com').\
subquery()
adalias = aliased(Address, stmt)
for user, address in session.query(User, adalias).\
join(adalias, User.address):
print(user)
print(address)
In [19]:
from sqlalchemy.sql import exists
In [20]:
stmt = exists().where(Address.user_id == User.id)
In [21]:
for name, in session.query(User.name).filter(stmt):
print(name)
In [22]:
# 这里查找的是任意一个拥有对应的 Address 对象的 User
for name, in session.query(User.name).filter(User.address.any()):
print(name)
In [23]:
# 这里匹配的是拥有对应的 Address 对象且 Address 的 email_address 复合 “%gmail%” 的 User
for name, in session.query(User.name).filter(User.address.any(Address.email_address.like('%gmail%'))):
print(name)
In [24]:
# 这里查找的是任意的Address对应的用户的用户名是`jack`的补集
session.query(Address).filter(~Address.user.has(User.name == 'jack')).all()
Out[24]:
下面这个例子中,使用了 subqueryload, User.address 相关的对象会被立刻载入
In [25]:
from sqlalchemy.orm import subqueryload
In [26]:
u = session.query(User).filter(User.name == 'jack').options(subqueryload(User.address)).first()
In [27]:
u
Out[27]:
In [28]:
u.address
Out[28]:
In [29]:
u.address[0].id
Out[29]:
In [30]:
from sqlalchemy.orm import joinedload
In [31]:
jack = session.query(User).filter(User.name == 'jack').options(joinedload(User.address)).first()
In [32]:
jack.address
Out[32]:
In [33]:
jack.address[0].id
Out[33]:
In [34]:
from sqlalchemy.orm import contains_eager
In [35]:
jack_address = session.query(Address)\
.join(Address.user)\
.filter(User.name == 'jack')\
.options(contains_eager(Address.user))
In [36]:
jack_address[0].id
Out[36]:
In [37]:
jack_address[0].user
Out[37]:
In [38]:
session.delete(jack)
In [39]:
session.query(User).filter(User.name == 'jack').count()
Out[39]:
In [40]:
session.query(Address).filter(Address.email_address.in_(['jack@gmail.com', 'jack@qq.com'])).count()
Out[40]:
In [41]:
session.close()
In [42]:
Base = declarative_base()
In [43]:
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(32))
fullname = Column(String(32))
password = Column(String(32))
address = relationship("Address", order_by="Address.id", back_populates="user",
cascade="all, delete, delete-orphan")
def __repr__(self):
return "<User(name='{}', fullname='{}', password='{}')".format(
self.name, self.fullname, self.password)
In [44]:
class Address(Base):
__tablename__ = 'address'
id = Column(Integer, primary_key=True)
email_address = Column(String(64), nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
user = relationship("User", back_populates="address")
def __repr__(self):
return "<Address(email_address='%s')>" % self.email_address
In [46]:
# 这里的创建语句的含义是,如果表已经存在的话,就不会再创建了
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
In [47]:
Session = sessionmaker(bind=engine)
In [48]:
session = Session()
In [49]:
jack = User(name='jack', fullname='Jack Bean', password='gjffdd')
jack.address = [
Address(email_address='jack@gmail.com'),
Address(email_address='jack@qq.com')
]
In [50]:
session.add(jack)
session.commit()
In [51]:
jack = session.query(User).get(1)
In [53]:
del jack.address[0]
In [54]:
session.query(Address).count()
Out[54]:
In [55]:
session.delete(jack)
In [56]:
session.query(Address).count()
Out[56]:
In [57]:
session.commit()