Python 微型ORM Peewee 教程

Python 中最著名的 ORM 自然是 sqlalchemy 了,但是 sqlalchemy 有些年头了,体积庞大,略显笨重。Peewee 还比较年轻,历史包袱比较少,也仅仅支持 Postgres、MySQL、Sqlite 这三种互联网公司最常见的数据库,所以整体上来说是比较轻量的。

连接和创建数据库

db.init(**args)
db.connect()
db.create_table([Person])

连接池

自动重连,保持连接

在长时间运行的后台脚本使用数据库的时候,可能会遇到连接丢失的问题。peewee 提供了一个 Mixin 可以在连接丢失时候重连,这点比 django 方便多了。

from peewee import MySQLDatabase
from playhouse.shortcuts import ReconnectMixin

class ReconnectMySQLDatabase(ReconnectMixin, MySQLDatabase):
    pass

db = ReconnectMySQLDatabase('my_app', ...)

定义表

peewee 在创建模型的时候就设定了数据库链接,个人感觉这个设计似乎不是很好。不过好在可以先不指定参数,而在实际使用的时候再链接数据库。

import peewee as pw

db = SqliteDatabase(None)  # 这里不配置数据库链接是为了之后方便更改不同环境

class Person(pw.Model):
    name = pw.CharField()
    birthday = pw.DateField()

    class Meta:
        database = db

class Pet(Model):
    owner = ForeignKeyField(Person, backref='pets')
    name = CharField()
    animal_type = CharField()

    class Meta:
        database = db

如果有自引用的外键,可以使用 "self" 来指定。如果有循环引用的外键,可以使用 DeferredForeignKey。
在 django 的 ORM 中,我们可以直接使用 FIELD_id 这样来访问一个外键的 id。这个在 peewee 中也是支持的。但是在设置的时候却不需要加上 _id 的后缀。在使用 where 语句的时候也不需要使用后缀。

event_id = ticket.event_id
ticket.event = new_event_id
Ticket.select().where(event == desired_event_id)

执行裸SQL

database.execute_sql()

增删改查

读取数据

基本的语法是 Model.select(fields).where(**coditions).get(). 或者直接简写成 Model.get()

# peewee 只会查询一次数据库,不管迭代多少次。
query = Pet.select().where(Pet.animal_type == 'cat')
for pet in query:
    print(pet.name, pet.owner.name)  # 注意这里有 N+1 问题,N 指的是获取 owner.name

grandma = Person.get(Person.name == 'Grandma L.')

# 对于 id 可以直接使用 get_by_id

Person.get_by_id(100)

# 使用 get_or_none 阻止抛出异常

Person.get_or_none()

# 可以使用 join 解决 N+1 问题
query = (Pet
         .select(Pet, Person)
         .join(Person)
         .where(Pet.animal_type == 'cat'))
         .order_by(Pet.name)  # 或者 Pet.name.desc() 逆序排列

for pet in query:
    print(pet.name, pet.owner.name)

可以直接使用 | 来作为查询条件,这个相比 django 需要使用 Q 来说,设计地非常优雅。

d1940 = date(1940, 1, 1)
d1960 = date(1960, 1, 1)
query = (Person
         .select()
         .where((Person.birthday < d1940) | (Person.birthday > d1960)))

for person in query:
    print(person.name, person.birthday)

# prints:
# Bob 1960-01-15
# Grandma L. 1935-03-01

query.count()  #  返回记录的大小

get_or_create

peewee 模仿 django 实现了 get_or_create 的方法。注意他的参数是 Django 风格的,而不是 peewee 的 model.attr == xxx 的风格。

person, created = Person.get_or_create(
    first_name=first_name,
    last_name=last_name,
    defaults={'dob': dob, 'favorite_color': 'green'})

iterator

对于返回结果过多的查询,可以使用 iterator 方法。

返回简单对象

插入数据

跟 django 的 ORM 貌似是一样的。使用 Model.create() 或者 Model.save() 或者 Model.insert()

from datetime import date

# 使用 save
uncle_bob = Person(name='Bob', birthday=date(1960, 1, 15))
uncle_bob.save() # bob is now stored in the database

# 使用 create
grandma = Person.create(name='Grandma', birthday=date(1935, 3, 1))
bob_kitty = Pet.create(owner=uncle_bob, name='Kitty', animal_type='cat')  # 带有外键的宠物

# 使用 bulk_create
User.bulk_create(users, batch_size=100)

# 使用 insert
User.insert(username="mickey").execute()

# 使用 insert many。或者使用 tuple 也可以
data_source = [
    {'field1': 'val1-1', 'field2': 'val1-2'},
    {'field1': 'val2-1', 'field2': 'val2-2'},
    # ...
]

# Fastest way to INSERT multiple rows.
MyModel.insert_many(data_source).execute()

# We can INSERT tuples as well...
data = [('val1-1', 'val1-2'),
        ('val2-1', 'val2-2'),
        ('val3-1', 'val3-2')]

# But we need to indicate which fields the values correspond to.
MyModel.insert_many(data, fields=[MyModel.field1, MyModel.field2]).execute()

更新数据

可以使用 Model.update 或者 model.save 更新数据。

# 使用 save 更新
herb_fido.owner = uncle_bob
herb_fido.save()

# 使用 update 更新
query = Tweet.update(is_published=True).where(Tweet.creation_date < today)

# 批量更新数据
# First, create 3 users with usernames u1, u2, u3.
u1, u2, u3 = [User.create(username='u%s' % i) for i in (1, 2, 3)]

# Now we'll modify the user instances.
u1.username = 'u1-x'
u2.username = 'u2-y'
u3.username = 'u3-z'

# Update all three users with a single UPDATE query.
User.bulk_update([u1, u2, u3], fields=[User.username])

需要注意的是,在使用 update 的时候千万不要在 Python 中使用计算再更新,要使用 SQL 语句来更新,这样才能具有原子性。

错误做法

>>> for stat in Stat.select().where(Stat.url == request.url):
...     stat.counter += 1
...     stat.save()

正确做法

>>> query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
>>> query.execute()

删除数据

可以使用 model.delete_instance 或者 Model.delete。

# 使用 object.delete_instance
herb_mittens.delete_instance()

# 使用 Model.delete
Tweet.delete().where(Tweet.creation_date < one_year_ago).execute()

一些有用的拓展

模型转换成字典

除了在查询的时候使用 model.dicts 以外,还可以使用 model_to_dict(model) 这个函数。

>>> user = User.create(username='charlie')
>>> model_to_dict(user)
{'id': 1, 'username': 'charlie'}

从数据库生成模型

最后也是最牛逼的一点,可以使用 pwiz 工具从已有的数据库产生 peewee 的模型文件:

python -m pwiz -e postgresql charles_blog > blog_models.py

参考

  1. https://stackoverflow.com/questions/45345549/peewee-mysql-server-has-gone-away-error/57797698#57797698

About 逸飞

后端工程师

发表评论

电子邮件地址不会被公开。 必填项已用*标注