--- title: Peewee源码解读 date: 2017-02-05 tags: [Peewee, Python, 源码] ---

通过select查询语句引入


In [5]:
from model.db.tb_raw import TbRaw
query = TbRaw.select().where(TbRaw.id > 0)
print "select query:", query.sql()[0] % tuple(query.sql()[1])
print "query result:"
for item in query.dicts().execute():
    print item


select query: SELECT `t1`.`id`, `t1`.`name`, `t1`.`age`, `t1`.`create_time` FROM `tb_raw` AS t1 WHERE (`t1`.`id` > 0)
query result:
{'age': 12, 'create_time': datetime.datetime(2017, 2, 5, 0, 14, 6), 'id': 1, 'name': u'test'}
{'age': 20, 'create_time': datetime.datetime(2017, 2, 5, 0, 14, 55), 'id': 2, 'name': u'zhang'}

分析每一次调用

Model

其中Tb1是我们定义的一个Model

class TbRaw(Model):

    """
    """
    id = PrimaryKeyField()
    name = CharField()
    age = IntegerField()
    create_time = DateTimeField()

    class Meta(object):

        """表配置信息
        """
        database = MySQLDatabase(database="xyz", host="127.0.0.1", password="123456", user="root", port=3306)
        db_table = "tb_raw"

peewee对Model类指定BaseModel元类

class Model(with_metaclass(BaseModel)):

BaseModel中做的就是将TbRaw的配置信息转换成一个ModelOptions对象放到_meta中,其中包括数据库连接、字段信息、主键、表明、索引等。因为_meta是在元类中生成,所以Model表的配置信息不能针对实例做修改,而是一个整体的修改。这里对于做分表查询不能很好的支持。

Model中提供了SQL中的数据查询、操作、定义的类方法,对应方法返回的对象如下。

|Model方法|返回对象| |--------|--------| |select()|SelectQuery| |update()|UpdateQuery| |insert()|InsertQuery| |delete()|DeleteQuery| |raw()|RawQuery| |noop()|NoopSelectQuery|

SelectQuery

    @classmethod
    def select(cls, *selection):
        query = SelectQuery(cls, *selection)
        if cls._meta.order_by:
            query = query.order_by(*cls._meta.order_by)
        return query

select()方法返回一个SelectQuery对象,并将排序字段传递进去。

SelectQuery中提供保留的查询方法(group_by、having、order_by、window、limit、offset、paginate、distinct、where等) 这些查询方法通过修饰器@returns_clone每一次返回一个新的SelectQuery对象。

def returns_clone(func):
    """
    Method decorator that will "clone" the object before applying the given
    method.  This ensures that state is mutated in a more predictable fashion,
    and promotes the use of method-chaining.
    """
    def inner(self, *args, **kwargs):
        clone = self.clone()  # Assumes object implements `clone`.
        func(clone, *args, **kwargs)
        return clone
    inner.call_local = func  # Provide a way to call without cloning.
    return inner

return_clone首先会调用Query中的clone()方法,然后执行相应的保留查询方法。

def clone(self):
    query = type(self)(self.model_class)
    query.database = self.database
    return self._clone_attributes(query)

clone()方法会生成一个新的SelectQuery对象,然后复用数据库连接,克隆原Query实例中的对象。


In [7]:
# UNION查询
query = TbRaw.select().where(TbRaw.id >= 2) | TbRaw.select().where(TbRaw.id < 2)
print query.sql()[0] % tuple(query.sql()[1])
for item in query.dicts().execute():
    print item


(SELECT `t1`.`id`, `t1`.`name`, `t1`.`age`, `t1`.`create_time` FROM `tb_raw` AS t1 WHERE (`t1`.`id` >= 2)) UNION (SELECT `t2`.`id`, `t2`.`name`, `t2`.`age`, `t2`.`create_time` FROM `tb_raw` AS t2 WHERE (`t2`.`id` < 2))
{'age': 20, 'create_time': datetime.datetime(2017, 2, 5, 0, 14, 55), 'id': 2, 'name': u'zhang'}
{'age': 12, 'create_time': datetime.datetime(2017, 2, 5, 0, 14, 6), 'id': 1, 'name': u'test'}

SelectQuery中通过重载运算符实现UNION(并集)、INTERSECT(交集)、EXCEPT(差集)语句。

def compound_op(operator):
    def inner(self, other):
        supported_ops = self.model_class._meta.database.compound_operations
        if operator not in supported_ops:
            raise ValueError(
                'Your database does not support %s' % operator)
        return CompoundSelect(self.model_class, self, operator, other)
    return inner
_compound_op_static = staticmethod(compound_op)

__or__ = compound_op('UNION')
__and__ = compound_op('INTERSECT')
__sub__ = compound_op('EXCEPT')

def __xor__(self, rhs):
    # Symmetric difference, should just be (self | rhs) - (self & rhs)...
    wrapped_rhs = self.model_class.select(SQL('*')).from_(
        EnclosedClause((self & rhs)).alias('_')).order_by()
    return (self | rhs) - wrapped_rhs
def union_all(self, rhs):
    return SelectQuery._compound_op_static('UNION ALL')(self, rhs)

execute()

def execute(self):
    if self._dirty or self._qr is None:
        model_class = self.model_class
        query_meta = self.get_query_meta()
        ResultWrapper = self._get_result_wrapper()
        self._qr = ResultWrapper(model_class, self._execute(), query_meta)
        self._dirty = False
        return self._qr
    else:
        return self._qr

execute()方法是生成语句并获取返回数据的地方。