Python

一分钟学一个 Python 标准库之 Pathlib

相对于 os.path 来说,Pathlib 极大地简化了路径相关的操作。举个例子来说:

获取当前文件路径下的 default.yaml 文件

import os

os.path.join(os.path.dirname(__file__), "default.yaml")

只需要:

from pathlib import Path

Path(__file__).parent / "default.yaml"

不难看出,pathlib 有几个优点:

  1. 字符少了很多,出 bug 的地方就少了很多;
  2. 和操作系统一样,pathlib 直接使用路径分隔符 / 来操作,而不是 join 的两个参数;
  3. 非常符合直觉,从左到右阅读。而 os.path 使用函数参数,需要来回跳跃理解。相比之下,Pathlib 阅读起来很清晰。

Python3 标准库中所有接受 str 作为路径参数的地方,现在都可以接受一个 path 对象了。如果你使用的第三方库只接受 str 作为参数,
可以通过 path_str = str(path) 来转化一下。

Pathlib 的其他常用方法:

from pathlib import Path

cwd = Path.cwd()  # 获取当前目录
home = Path.home()  # 获取家目录,比如 /home/ubuntu
path = Path("/home/yifei")  # 创建一个新的 path 对象

path.is_dir()  # 是否是目录
path.is_file()  # 是否是普通文件
path.exists()  # 路径是否存在
path.resolve()  # 解析成绝对路径,比如 Path(".").resolve() 相当于 Path.cwd()

path.mkdir(parents=True, exists_ok=True) # 类似 mkdir -p

# 遍历目录
for child in path.iterdir():
    print(child)

# 除此之外,还有几个很甜的方法,省去了 with open 语句
path = Path("/home/ubuntu/readme.txt")
text = path.read_text()
path.write_text(text)

path = Path("/home/ubuntu/image.png")
image = path.read_bytes()
path.write_bytes(image)

>>> path
PosixPath('/home/ubuntu/test.md')
>>> path.name
'test.md'
>>> path.stem
'test'
>>> path.suffix
'.md'
>>> path.parent
PosixPath('/home/ubuntu')
>>> path.parent.parent
PosixPath('/home')
>>> path.anchor
'/'

在这个注意力涣散的年代,想要经常写一些长篇大论的帖子实在太难了,可能一个月才能憋出一篇。
不如分享写简单但是有用的知识点,或许还能经常更新。

以上就是全部内容啦。

参考

  1. https://realpython.com/python-pathlib/
  2. https://docs.python.org/3/library/pathlib.html

SQLAlchemy 2.0 教程

SQLAlchemy 非常优雅,虽然大多数时候可以认为它是一个 ORM 库,但是实际上它分为了两部分——底层的 Core 和上层的传统 ORM。在 Python 乃至其他语言的大多数 ORM 中,都没有实现很好的分离,比如 django 的 ORM,数据库链接和 ORM 本身完全是混在一起的。

为什么要有 Core

Core 层主要实现了客户端连接池的功能。我们知道,关系型数据库作为现代 Web 应用的核心,它的并发链接能力往往并不是很强,最好不要搞好多短链接过去,所以我们一般是需要一个连接池的。链接池大体分为两种,一种是服务端的,也就是一个专门的连接池中间件,把短链接每次分配一个长链接复用。另一种就是客户端的,一般作为程序的一部分实现。SQLAlchemy 的连接池显然是客户端连接池的一种。在这个连接池中,SQLAlchemy 维护了一定数量的链接,当你调用 connect 的时候,实际上是从池子中取出了一个链接,调用 close 的时候实际上是放回到了池子中一个链接。

创建链接

SQLAlcehmy 统一使用 create_engine 来创建链接(池)。create_engine 的参数是一个数据库的 URL。

from sqlalchemy import create_engine

engine = create_engine(
    "mysql://user:[email protected]:3306/dbname",
    echo=True,  # echo 设为 true 会打印出实际执行的 sql,调试的时候更方便
    future=True,  # 使用 2.0API,向后兼容
    pool_size=5, # 连接池的大小默认为 5 个,设置为 0 时表示连接无限制
    pool_recycle: 3600, # 设置时间以限制数据库多久没连接自动断开。
)

# 创建一个 SQLite 的内存数据库
engine = create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True)

# mysqlclient (a maintained fork of MySQL-Python)
# pip install mysqlclient
engine = create_engine('mysql+mysqldb://scott:[email protected]/foo?charset=utf8mb4')

直接使用 SQL

CRUD

from sqlachemy import text

with engine.connect() as conn:
    result = conn.execute(text("select * from users"))
    print(result.all())

# result 还可以遍历,每一个行结果是一个 Row 对象
for row in result:
    # row 对象三种访问方式都支持
    print(row.x, row.y)  
    print(row[0], row[1])
    print(row["x"], row["y"])

# 传递参数,使用 `:var` 传递
result = conn.execute(
...         text("SELECT x, y FROM some_table WHERE y > :y"),
...         {"y": 2}
...     )
# 也可以预先编译好参数
stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)

# 插入的时候比较牛逼,可以直接插入多条
conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 11, "y": 12}, {"x": 13, "y": 14}]
...     )

commit

sqlalchemy 提供两种提交的方式,一种是手工 commit,一种是半自动 commit。官方文档建议使用 engine.begin()。还有一种完全自动的,每一行提交一次的 autocommit 方式,不建议使用。

# "commit as you go"  需要手动 commit
>>> with engine.connect() as conn:
...     conn.execute(text("CREATE TABLE some_table (x int, y int)"))
...     conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 1, "y": 1}, {"x": 2, "y": 4}]
...     )
...     conn.commit()  # 注意这里的 commit

# "begin once"  半自动 commit
>>> with engine.begin() as conn:
...     conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 6, "y": 8}, {"x": 9, "y": 10}]
...     )

ORM

session 不是线程安全的。但是一般情况下,web 框架应该在每个请求开始的时候获得一个 session,所以也不是问题。

from sqlalchemy.orm import Session

with Session(engine) as session:
    session.add(foo)
    session.commit()

# 还可以使用 sessionmaker 来创建一个工厂函数,这样就不用每次都输入参数了

from sqlachemy.orm import sessionmaker
Session = sessionmaker(engine)

with Session() as session:
    ...

声明式 API

from datetime import datetime
from sqlalchemy.sql import func
from sqlalchemy import Integer, Column, String, 
from sqlalchemy.orm import declarative_base, relationship

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String(30))
    fullname = Column(String)
    # 对于特别大的字段,还可以使用 defer,这样默认不加载这个字段
    description = deferred(Column(Text))
    # 默认值,注意传递的是函数,不是现在的时间
    time_created = Column(DateTime(Timezone=True), default=datetime.now)
    # 或者使用服务器默认值,但是这个必须在表创建的时候就设置好
    time_created = Column(DateTime(timezone=True), server_default=func.now())
    time_updated = Column(DateTime(timezone=True), onupdate=func.now())

class Address(Base):
    __tablename__ = "address"
    id = Column(Integer, primary_key=True)
    email_address = Column(String, nullable=False)

# 调用 create_all 创建所有模型
Base.metadata.create_all(engine)

# 如果只需要创建一个模型
User.__table__.create(engine)

外键

多对一关系的双向映射

from sqlalchemy.orm import relationship

class Parent(Base):
    __tablename__ = "parents"
    id = Column(Integer, primary_key=True)
    # 这里指定了一个 children 属性,指向对应的 Child 对象们,back_ref 指定了在对应的对象中的属性名
    children = relationship("Child", backref="parent")

class Child(Base):
    __tablename__ = "children"
    id = Column(Integer, primary_key=True)
    # 这是数据库中存的字段,类型是 Foreign Key
    parent_id = Column(Integer, ForeignKey("parents.id"))

多对多映射

class Map(Base):
    __tablename__ = "map"
    parent_id = Column(Integer, ForeignKey("parents.id"))
    child_id = Column(Integer, ForeignKey("children.id"))

class Parent(Base):
    __tablename__ = "parents"
    id = Column(Integer, primary_key=True)
    children = relationship("Child", secondary=Map, backref="parents")

class Child(Base):
    __tablename__ = "children"
    id = Column(Integer, primary_key=True)

CRUD

和 1.x API 不用的是,2.0 API 中不再使用 query,而是使用 select 来查询数据。

from sqlalchemy import select

stmt = select(User).where(User.name == "john").order_by(User.id)
# order_by 还可以使用 User.id.desc() 表示逆序排列

result = session.execute(stmt)

# 一般情况下,当选取整个对象的时候,都要用 scalars 方法,否则返回的是一个包含一个对象的 tuple
for user in result.scalars():
    print(user.name)

result = session.execute(select(User.name))
for row in result:
    print(row.name)

# 添加对象直接使用 session.add 方法
session.add(user)

# 如果要获取插入后的 ID
session.flush()   # flush 并不是 commit,并没有提交事务,应该是可重复读,和数据库的隔离级别有关。
print(user.id)

# 删除使用 session.delete
session.delete(user)

# 更新数据需要使用 update 语句
from sqlalchemy import update
# synchronize_session 有三种选项: false, "fetch", "evaluate",默认是 evaluate
# false 表示完全不更新 Python 中的对象
# fetch 表示重新从数据库中加载一份对象
# evaluate 表示在更新数据库的同时,也尽量在 Python 中的对象上使用同样的操作
stmt = update(User).where(User.name == "john").values(name="John").execution_options(synchronize_session="fetch")
session.execute(stmt)

# 或者直接赋值
user.name = "John"
session.commit()

# 这里有一个可能引入 race condition(静态条件)的地方
user.visit_count += 1  # 错误!如果两个进程同时更新这个值,可能导致更新失败
# SQL:Update users set visit_count = 2 where user.id = 1
user.visit_count = User.visit_count + 1  # 注意大写的 U,也就是使用了模型的属性,生成的 SQL 是在 SQL 中 +1
# SQL: Update users set visit_count = visit_count + 1 where user.id = 1

加载外键的值

# 默认情况下,在查询中是不会加载外键的,我们可以使用 joinedload 选项来加载外键,从而避免 N+1 问题
session.execute(select(Child)).scalars().all()  # 没有加载 parent 外键
# 使用 joinedload 加载外键,注意需要使用 unique 方法,这是 2.0 中规定的。
session.execute(select(Child).options(joinedload(Child.parent))).unique().scalars().all()

在 2.0 中,更推荐使用 selectinload 而不是 joinedload,一般情况下,selectinload 都要好,而且不用使用 unique.
从原理上来说,joinedload 是通过 join 的方式加载了对应的数据。而 selectinload 是通过 select * from parents where id in … 来实现的。

session.execute(select(Child).options(selectinload(Child.parent))).scalars().all()

外键的写入

SQLAlchemy 中,直接像处理数组一样处理外键就好了,这点非常方便。

parent.children.append(child1)  # 添加
parent.children.remove(child2)  # 删除
parent.children.clear()
parent.children = []  

JSON 字段的特殊处理

大多数的数据库现在都支持 JSON 字段了,在 SQLAlchemy 中我们也可以直接从字段读取 json 对象或者写入 json 对象。
但是,要记住,千万不要直接对这个 json 对象做 update 并期望写回数据库中,这个是不可靠的。一定要复制后读写,然后在赋值回去。

article = session.get(Article, 1)
tags = copy.copy(article.tags)
tags.append("iOS")
article.tags = tags
session.commit()

批量插入

当需要插入大量数据的时候,如果依然采用逐个插入的方法,那么就会在和数据库的交互上浪费很多时间,效率很低。
MySQL 等大多数数据库都提供了 insert ... values (...), (...) ... 这个批量插入的 API,在 SQLAlchemy 中也可以很好地利用这一点。

# 使用 session.bulk_save_objects(...) 直接插入多个对象

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

# 使用 bulk_insert_mappings 可以省去创建对象的开销,直接插入字典
users = [
    {"name": "u1"},
    {"name": "u2"},
    {"name": "u3"},
]
s.bulk_insert_mappings(User, users)
s.commit()

# 使用 bulk_update_mappings 可以批量更新对象,字典中的 id 会被用作 where 条件,其他字段全部用于更新
session.bulk_update_mappings(User, users)

从 1.X API 迁移到 2.0 API

# session.query(User).get(42)
session.get(User, 42)

# session.query(User).all()
session.execute(select(User)).scalars().all()

# session.query(User).filter_by(name="some_user").one()
session.execute(select(User).filter_by(name="some_user")).scalar_one()

# session.query(User).from_statememt(text("select * from users")).a..()
session.execute(select(User).from_statement(text("selct * from users"))).scalars().all()

# session.query(User).filter(User.name == "foo").update({"fullname": "FooBar"}, synchronize_session="evaluate")
session.execute(update(User).where(User.name == "foo").values(fullname="FooBar").execute_options(synchronize_session="evaluate"))

反射——从数据库创建模型

在 FastAPI 中使用

参考

  1. https://docs.sqlalchemy.org/en/14/tutorial/ormrelatedobjects.html
  2. https://stackoverflow.com/questions/25668092/flask-sqlalchemy-many-to-many-insert-data
  3. https://stackoverflow.com/questions/9667138/how-to-update-sqlalchemy-row-entry
  4. https://docs.sqlalchemy.org/en/14/changelog/migration_20.html
  5. https://stackoverflow.com/questions/13370317/sqlalchemy-default-datetime
  6. https://amercader.net/blog/beware-of-json-fields-in-sqlalchemy/
  7. https://stackoverflow.com/questions/26948397/how-to-delete-records-from-many-to-many-secondary-table-in-sqlalchemy
  8. Count by many to many foreign key
  9. https://stackoverflow.com/questions/19175311/how-to-create-only-one-table-with-sqlalchemy/19175907
  10. https://stackoverflow.com/questions/63220132/sqlalchemy-insert-to-mysql-db-unicodeencodeerror-for-cyrlic-data

Python 3 中的 dataclass

在 Python 中,如果要为一个类添加一些数据成员的话,需要做的事情还挺多,比如说编写 __init__,
__str__ 这些函数,代码都是重复的,没啥意义。在 Python 3.7 中,终于添加了一个语法糖,叫做
dataclass. 下面我们就来看一下吧~

# 注意!包名叫 dataclasses, 多了个 es
from dataclasses import dataclass

@dataclass
class InventoryItem:
    """Class for keeping track of an item in inventory."""
    name: str
    unit_price: float
    quantity_on_hand: int = 0

    def total_cost(self) -> float:
        return self.unit_price * self.quantity_on_hand

上面的代码就相当于以前的:

def __init__(self, name: str, unit_price: float, quantity_on_hand: int=0):
    self.name = name
    self.unit_price = unit_price
    self.quantity_on_hand = quantity_on_hand

def __repr__(self):
    ...

def __eq__(self):
    ...

...

我们知道,在 Python 的默认参数中,使用 mutable(可变) 的对象是一种常见的坑,在 dataclass 中当然也存在
了,还好标准库中给我们提供了一个方法。

# 会报错
@dataclass
class Request:
    headers: dict = {}

# 相当于
class Request:
    def __init__(self, headers={}):
        self.headers = headers

# 正确的写法
from dataclasses import field

@dataclass
class Request:
    headers: dict = field(default_factory=dict)

字典这种类型是比较容易想起来不能直接做参数的,比较坑的是对于其他的自定义对象,Python 解释器并不会提
示有问题,比如说这样:

# 千万别这么做
@dataclass
class Request:
    headers: Headers = Headers()

这时候坑爹的事情就发生了,每次创建新的 Request 对象引用的都是同一个 Headers 对象,也就是在声明这个类
的同时产生的这个 Headers 对象!原因也很简单,就像是上面的 dict 一样,这个 Headers 并不是在 init 函数
中,所以只会产生一次。所以,需要牢记的是:dataclass 中的所有对象默认值就相当于函数的默认参数,永远不
要传递一个 mutable 就好了。

# 上面的例子相当于
class Request:
    def __init__(self, headers=Headers()):
        self.headers = headers

# 正确的做法
@dataclass
class Request:
    headers: Headers = field(default_factory=Headers)

dataclasses 模块中还提供了 asdict 方法,这样就可以方便地转换为 json 对象啦。

from dataclasses import asdict

@dataclass
class Request:
    url: str = ""
    method: str = ""

req = asdict(Request())

参考资料

  1. https://docs.python.org/3/library/dataclasses.html

Python metaclass 的原理和应用

元编程(meta programming)是一项很神奇的能力,可以通过代码在运行时动态生成代码。元类(meta classes)是 Python 提供的一种元编程的能力。在 Python 中,类也是一种对象,那么类这种对象就是元类的实例,所以我们可以在运行时通过实例化元类动态生成类。

使用 type “函数”

首先我们来了解一下 type,type 可以作为函数使用,用来获得对象的类型:

>>> class Foo:
...     pass
>>> obj = Foo()
>>> obj.__class__
<class "__main__.Foo">
>>> type(obj)
<class "__main__.Foo">
>>> obj.__class__ is type(obj)
True

实际上 type 并不是一个函数,而是一个类,我们可以使用 type(type) 来确定一下:

>>> type(type)
<class "type">

type 实际上不只是类,而是一个“元类”。我们接下来要可以看到,所有的元类都需要继承自 type。type 是所以类的元类,所以在上面的例子中 x 是 Foo 的实例,Foo 是 type 的实例,type 又是他自己的实例。

file

使用 type 动态创建类

如果传递给 type 的参数是三个的时候,type 的语义就不再是返回给定参数的类,而是实例化生成一个新的类。

type(name: str, bases: tuple, namespace: dict)

第一个参数是新生成的类的名字;第二个参数是新生成的类的基类列表;第三个参数是要个这个类绑定的属性的列表,比如说这个类的一些方法。实际上 class Foo 这种语法只是使用 type 生成类的语法糖而已。

最简单的一个例子,比如我们要创建 Foo[0..9] 这些类,可以这样做:

classes = []
for i in range(10):
    cls = type("Foo%s" % i, tuple(), {})
    classes.append(cls)

# 就像使用普通类一样初始化 Foo0

foo0  = clssses[0]()

如果要实现类的方法,一定要记得同样是要使用 self 变量的。在 Python 中 self 只是一个约定俗称的变量,而不是关键字。

def __init__(self, name):
    self.name = name

def print_name(self):
    print(self.name)

Duck = type("Duck", tuple(), {"__init__": __init__, "print_name": print_name})

duck = Duck("Donald")

duck.print_name()
# Donald

创建自己的元类

首先我们来回顾一下 Python 中类的初始化过程:

foo = Foo()

当这条语句运行的时候,Python 会依次调用 __new____init__ 方法。其中 __new__ 方法在 __init__ 之前调用,并返回已经创建好的新对象,而 __init__ 函数是没有返回结果的。一般情况下,我们都会覆盖 __init__ 方法来对新创建的对象做一些初始化操作。

现在回归到元类上,进入烧脑部分。前面我们说过元类的实例化就是类,所以大致相当于:

Foo = MetaFoo(name, bases, attrs)  # MetaFoo 默认情况下是 type
foo = Foo()

默认情况下,所有类的元类是 type,也就是在这个类是通过 type 来创建的,这和前面说的通过 type 来动态创建类也是一致的。

那么怎样定义一个 MetaFoo 呢?只需要继承自 type 就行了。因为元类的实例化就是类的创建过程,所以在元类中,我们可以修改 __new__ 来在 __init__ 之前对新创建的类做一些操作。

>>> class MetaFoo(type):
...     def __new__(cls, name, bases, namespace):
...         x = super().__new__(cls, name, bases, namespace)  # super实际上就是 type
...         x.bar = 100  # 为这个类增加一个属性
...         return x
...

>>> Foo = MetaFoo("Foo", tuple(), {})  # MetaFoo 在这里就相当于 type 了,可以动态创建类
>>> Foo.bar
100
>>> foo = Foo()
>>> foo.bar
100

在这里我们创建了 MetaFoo 这个元类,他会给新创建的类增加一个叫做 bar 的属性。

在实际的代码中,我们一般还是不会直接动态生成类的,还是调用 class Foo 语法来生成类比较常见一点,这时候可以指定 metaclass 参数就好了。可以通过 Foo(metaclass=MetaFoo) 这种方式来指定元类。

class Foo(metaclass=MetaFoo):
    pass

这种定义和上面的元类用法效果完全是一致的。

一个现实世界的元类例子

在 django.models 或者 peewee 等 ORM 中,我们一般使用类的成员变量来定义字段,这里就用到了元类。

class Field:
    pass

class IntegerField(Field):
    pass

class CharField(Field):
    pass

class MetaModel(type):
    def __new__(meta, name, bases, attrs):
        # 这里最神奇的是:用户定义的类中的 bases 和 attrs 都会作为参数传递进来
        fields = {}
        for key, value in attrs.items():
            if isinstance(value, Field):
                value.name = "%s.%s" % (name, key)
                fields[key] = value
        for base in bases:
            if hasattr(base, "_fields"):
                fields.update(base._fields)
        attrs["_fields"] = fields
        return type.__new__(meta, name, bases, attrs)

class Model(metaclass=MetaModel):
    pass

这样用户使用的时候就可以这样定义:

>>> class A(Model):
...     foo = IntegerField()
...
>>> class B(A):
...     bar = CharField()
...
>>> B._fields
{"foo": Integer("A.foo"), "bar": String("B.bar")}

程序在执行的时候就可以直接访问 X._fields,而不用每次都通过反射遍历一次,从而提高效率以及做一些验证。

不过,其实这个完全可以通过装饰器来实现:

def model(cls):
    fields = {}
    for key, value in vars(cls).items():
        if isinstance(value, Field):
            value.name = "%s.%s" % (cls.__name__, key)
            fields[key] = value
    for base in cls.__bases__:
        if hasattr(base, "_fields"):
            fields.update(base._fields)
    cls._fields = fields
    return cls

@model
class A():
    foo = IntegerField()

class B(A):
    bar = CharField()

但是用装饰器的话,就失去了一些类型继承的语义信息。

总结与思考

Python 中的元编程还是一种很强大的特性,但是也比较复杂,有时候很难以理解。实际上,过分的动态特性也导致了 Python 的解释器和静态分析、自动补全等很难优化,因为有好多信息必须到运行时才能知道。

实际上近些年新开发的语言越来越多地加入了静态类型的特性,比如 swift, rust, go 等。就连 Python 本身也增加了 type hinting 的功能,很遗憾的是,这个功能不是强制性的,所以也很难用来提升性能。

元类这块应该是我在 Python 语言方面了解的最后一大块知识了。接下来除了写业务代码不会再深究 Python 了,研究 Golang 去了~

Au revoir, Python!

参考

  1. https://realpython.com/python-metaclasses/
  2. https://stackoverflow.com/questions/392160/what-are-some-concrete-use-cases-for-metaclasses
  3. https://blog.ionelmc.ro/2015/02/09/understanding-python-metaclasses/
  4. https://stackoverflow.com/questions/2608708/what-is-the-difference-between-type-and-type-new-in-python

在 IPython 中自动重新导入包

在使用 IPython 交互性测试编写的函数的时候,可以打开自动重新导入包的功能,这样每次保存后就可以直接测试了。

In [1]: %load_ext autoreload

In [2]: %autoreload 2

其中三个数字的含义是:

  • %autoreload 0 – 关闭自动重新导入
  • %autoreload 1 – 只在 import 语句重新导入
  • %autoreload 2 – 调用的时候自动重新导入

如果想要在 IPython 中自动启用

$ ipython profile create
$ vim ~/.ipython/profile_default/ipython_config.py
c.InteractiveShellApp.extensions = ["autoreload"]
c.InteractiveShellApp.exec_lines = ["%autoreload 2"]

Python 微型ORM Peewee 教程

Python 中最著名的 ORM 自然是 sqlalchemy 了,但是 sqlalchemy 有些年头了,体积庞大,略显笨重。Peewee 还比较年轻,历史包袱比较少,也仅仅支持 Postgres、MySQL、Sqlite 这三种互联网公司最常见的数据库,所以整体上来说是比较轻量的。

连接和创建数据库

db.init(**args)
db.connect()
db.create_table([Person])

连接池

自动重连,保持连接

在长时间运行的后台脚本使用数据库的时候,可能会遇到连接丢失的问题。peewee 提供了一个 Mixin 可以在连接丢失时候重连,这点比 django 方便多了。

from peewee import MySQLDatabase
from playhouse.shortcuts import ReconnectMixin

class ReconnectMySQLDatabase(ReconnectMixin, MySQLDatabase):
    pass

db = ReconnectMySQLDatabase("my_app", ...)

定义表

peewee 在创建模型的时候就设定了数据库链接,个人感觉这个设计似乎不是很好。不过好在可以先不指定参数,而在实际使用的时候再链接数据库。

import peewee as pw

db = SqliteDatabase(None)  # 这里不配置数据库链接是为了之后方便更改不同环境

class Person(pw.Model):
    name = pw.CharField()
    birthday = pw.DateField()

    class Meta:
        database = db

class Pet(Model):
    owner = ForeignKeyField(Person, backref="pets")
    name = CharField()
    animal_type = CharField()

    class Meta:
        database = db

如果有自引用的外键,可以使用 "self" 来指定。如果有循环引用的外键,可以使用 DeferredForeignKey。
在 django 的 ORM 中,我们可以直接使用 FIELD_id 这样来访问一个外键的 id。这个在 peewee 中也是支持的。但是在设置的时候却不需要加上 _id 的后缀。在使用 where 语句的时候也不需要使用后缀。

event_id = ticket.event_id
ticket.event = new_event_id
Ticket.select().where(event == desired_event_id)

执行裸SQL

database.execute_sql()

增删改查

读取数据

基本的语法是 Model.select(fields).where(**coditions).get(). 或者直接简写成 Model.get()

# peewee 只会查询一次数据库,不管迭代多少次。
query = Pet.select().where(Pet.animal_type == "cat")
for pet in query:
    print(pet.name, pet.owner.name)  # 注意这里有 N+1 问题,N 指的是获取 owner.name

# 直接获取一条数据,select, where 全省略了
grandma = Person.get(Person.name == "Grandma L.")

# 或者全写出来
grandma = Person.select().where(Person.name == "Gramdma L.").get()

# in 查询使用 in_ 方法
Pet.select().where(Pet.id.in_([1,2]))

# 对于 id 可以直接使用 get_by_id

Person.get_by_id(100)

# 使用 get_or_none 阻止抛出异常

Person.get_or_none()

# 可以使用 join 解决 N+1 问题
query = (Pet
         .select(Pet, Person)
         .join(Person)
         .where(Pet.animal_type == "cat"))
         .order_by(Pet.name)  # 或者 Pet.name.desc() 逆序排列

for pet in query:
    print(pet.name, pet.owner.name)

可以直接使用 | 来作为查询条件,这个相比 django 需要使用 Q 来说,设计地非常优雅。

d1940 = date(1940, 1, 1)
d1960 = date(1960, 1, 1)
query = (Person
         .select()
         .where((Person.birthday < d1940) | (Person.birthday > d1960)))

for person in query:
    print(person.name, person.birthday)

# prints:
# Bob 1960-01-15
# Grandma L. 1935-03-01

query.count()  #  返回记录的大小

getorcreate

peewee 模仿 django 实现了 getorcreate 的方法。注意他的参数是 Django 风格的,而不是 peewee 的 model.attr == xxx 的风格。

person, created = Person.get_or_create(
    first_name=first_name,
    last_name=last_name,
    defaults={"dob": dob, "favorite_color": "green"})

iterator

对于返回结果过多的查询,可以使用 iterator 方法。

返回简单对象

插入数据

跟 django 的 ORM 貌似是一样的。使用 Model.create() 或者 Model.save() 或者 Model.insert()

from datetime import date

# 使用 save
uncle_bob = Person(name="Bob", birthday=date(1960, 1, 15))
uncle_bob.save() # bob is now stored in the database

# 使用 create
grandma = Person.create(name="Grandma", birthday=date(1935, 3, 1))
bob_kitty = Pet.create(owner=uncle_bob, name="Kitty", animal_type="cat")  # 带有外键的宠物

# 使用 bulk_create
users = [User(username="u%s" % i) for i in range(10)]
User.bulk_create(users, batch_size=100)

# 使用 insert
User.insert(username="mickey").execute()

# 使用 insert many。或者使用 tuple 也可以
data_source = [
    {"field1": "val1-1", "field2": "val1-2"},
    {"field1": "val2-1", "field2": "val2-2"},
    # ...
]

# Fastest way to INSERT multiple rows.
MyModel.insert_many(data_source).execute()

# We can INSERT tuples as well...
data = [("val1-1", "val1-2"),
        ("val2-1", "val2-2"),
        ("val3-1", "val3-2")]

# But we need to indicate which fields the values correspond to.
MyModel.insert_many(data, fields=[MyModel.field1, MyModel.field2]).execute()

更新数据

可以使用 Model.update 或者 model.save 更新数据。

# 使用 save 更新
herb_fido.owner = uncle_bob
herb_fido.save()

# 使用 update 更新
query = Tweet.update(is_published=True).where(Tweet.creation_date < today)

# 批量更新数据
# First, create 3 users with usernames u1, u2, u3.
u1, u2, u3 = [User.create(username="u%s" % i) for i in (1, 2, 3)]

# Now we"ll modify the user instances.
u1.username = "u1-x"
u2.username = "u2-y"
u3.username = "u3-z"

# Update all three users with a single UPDATE query.
User.bulk_update([u1, u2, u3], fields=[User.username])

需要注意的是,在使用 update 的时候千万不要在 Python 中使用计算再更新,要使用 SQL 语句来更新,这样才能具有原子性。

错误做法

>>> for stat in Stat.select().where(Stat.url == request.url):
...     stat.counter += 1
...     stat.save()

正确做法

>>> query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
>>> query.execute()

删除数据

可以使用 model.delete_instance 或者 Model.delete。

# 使用 object.delete_instance
herb_mittens.delete_instance()

# 使用 Model.delete
Tweet.delete().where(Tweet.creation_date < one_year_ago).execute()

一些有用的拓展

模型转换成字典

除了在查询的时候使用 model.dicts 以外,还可以使用 modeltodict(model) 这个函数。

>>> user = User.create(username="charlie")
>>> model_to_dict(user)
{"id": 1, "username": "charlie"}

从数据库生成模型

最后也是最牛逼的一点,可以使用 pwiz 工具从已有的数据库产生 peewee 的模型文件:

python -m pwiz -e postgresql charles_blog > blog_models.py

参考

  1. https://stackoverflow.com/questions/45345549/peewee-mysql-server-has-gone-away-error/57797698#57797698

使用 cProfile 和火焰图调优 Python 程序性能

本来想坐下来写篇 2018 年的总结,仔细想想这一年发生的事情太多了,还是写篇技术文章吧。

前几天调试程序,发现 QPS 总是卡在 20 左右上不去。开始以为是 IO 问题,就多开了些并发,然并卵,这才想到可能是 CPU 的问题。看了看监控,发现程序某一步的延迟在 400ms 左右,而且这一步是 CPU 密集的。当时开了 4 台双核的机器:(1s / 400ms) * 2 * 4 = 20 啊。看来需要优化下这一步的代码了,那么第一步就是找到可以优化的地方。

测量程序的性能之前并没有实际做过,Google 了一番,感觉标准库的 cProfile 似乎值得一试。

要测量的代码逻辑也很简单,使用 lxml 解析 HTML,然后提取一些字段出来,这些基本都是调用的 C 库了,解析的算法也不在 Python 中。看起来似乎没有什么能改进的地方,不管怎样,还是先跑一下吧。

cProfile 有多种调用方法,可以直接从命令行调用:

python -m cProfile -s tottime your_program.py

其中的 -s 的意思是 sort。常用的 sort 类型有两个:

  1. tottime,指的是函数本身的运行时间,扣除了子函数的运行时间
  2. cumtime,指的是函数的累计运行时间,包含了子函数的运行时间

要获得对程序性能的全面理解,经常需要两个指标都看一下。

不过在这里,我们并不能直接使用命令行方式调用,因为我的代码中还需要一些比较繁重的配置加载,如果把这部分时间算进去了,多少有些干扰,那么我们也可以直接在代码中调用 cProfile。

使用 cProfile 的代码如下:

import cProfile, pstats, io

pr = cProfile.Profile()
pr.enable()

extractor.extract(crawl_doc=doc, composition=PageComposition.row, rule=rule)

pr.disable()
s = io.StringIO()
sortby = "cumtime"  # 仅适用于 3.6, 3.7 把这里改成常量了
ps = pstats.Stats(pr, stream=s).sort_stats(sortby)
ps.print_stats()
print(s.getvalue())

把需要 profile 的代码放到 pr.enable 和 pr.disable 中间就好了。注意这里我们使用了 cumtime 排序,也就是累计运行时间。

结果如下:

我们可以看到总的运行时间是 200ms,而其中红框内的部分就占了 100ms! 如果能够优化调的话,性能一下子就能提高一倍。红框内的代码是做什么的呢?我们知道解析一个 html 文档,第一步是建立 DOM 树,通常情况下,我们可能会从其中抽取一些链接。在网页中,链接不一定是绝对路径,也可能是 /images/2018-12-31-xxx.jpg 这样的相对路径。lxml 库帮我们做了一个贴心的默认值,那就是在构造 DOM 树的时候,根据传入的 url 来吧页面中的所有 url 都重写成绝对路径。看起来这是个很贴心的功能,但是在这里却成了性能瓶颈。为什么很耗时呢?大概是因为需要遍历整个 DOM 树,重写所有的链接吧。这显然不是我们需要的,我们只需要把抽取之后的链接还原成绝对路径就好了,而不是事先把所有链接都重写一遍。所以在这里我们直接去掉这个功能就好了。

修改代码之后,再次运行 profile 脚本,时长变成了 100ms:

这时候我们接着看,程序中下一个比较大头的时间占用:jsonfinder 和 json decode/encode。

jsonfinder 是一个有意思的库,它自动从 HTML 中查找 json 字符串并尝试解析,但是有时候也不太准。经常为了找到特定的值,还是需要使用正则。那么对于这个可有可无的功能,性能有这么差,还是删掉好了。

通过删代码,现在性能已经是原来的四倍了。

这时候发现代码里面有正则还挺花费时间的,不过还好,暂时先不管了。

刚刚都是只运行了一遍,测量结果难免有随机性,必定有失偏颇,实际上应该使用多个测试用例,成千上万次的跑,才能得到一个比较准确地结果。

上面这个小步骤基本没有什么可以优化的了,下面我们把优化目标扩大一点,并把次数先定为100.

下面这种图是按照 tottime 来排序的:

注意其中最耗时的步骤是 parseUnicodeDoc,也就是建树了,这是符合预期的,然而旁边的 ncalls 一栏却不太对劲了。我们明明只运行了 100 次,为什么这个函数调用了 300 次呢?显然代码中有重复建树的地方,也就是有隐藏的 bug。这种地方不经过 profile 很难浮现出来,因为程序本身的逻辑是对的,只是比较耗时而已。

优化之后,终于变成了 100. 从 cProfile 的表格现在已经看不出什么结果来了,下一步我们开始使用火焰图,可视化往往能让我们更容易注视到性能瓶颈。(为什么不一开始就用火焰图呢?因为我以为很麻烦。。实际很简单)

Python 中有一个第三方包(见参考文献)可以直接从 cProfile 的结果生成火焰图:

  1. 在原有的代码中加上一句: pr.dump_stats("pipeline.prof")
  2. 调用该工具:flameprof pipeline.prof > pipeline.svg

然后打开 SVG 文件就可以了:

其中火焰的宽度代表了运行的时长,我们现在的优化目标就是这些耗时比较长的步骤。

可以看大其中 mysql 的访问占了绝对的大头,按理说跑100次的话,不应该每次都花费时间在建立连接上啊,这里一定有问题。经过排查发现在某处链接是使用了 close_old_connections 来保证不会抛出数据库断开的异常,这还是在头条带来的习惯。。closeoldconnections 的功能是关闭已经失效的链接,看来我的理解还是有误的。先把这块删掉,最终解决应该是这块放到一个队列里,统一存入数据库。

去掉之后:

现在的大头又变成 lxml 的,又动了优化它的心思,lxml 是 libxml2 的一个 Python binding,查了下应该是最快的 html parser 了,这块真的没有什么优化空间。盯了一会儿,眼睛最终看到了一个小角落:

一个正则匹配居然占用了 8% 的运行时间,太不像话了。老早之前就听说 Python 的标准库正则性能不行,现在才发现原来是真的挺差劲的。Python 标准库的 re 模块采用的是 PCRE 的处理方式,而采用 NFA 的处理方式的正则要快很多,这块还需要再看一下。不过眼下倒是可以直接换一个库来解决。regex 模块是 re 模块的一个 drop-in replacement.

pip install regex and import regex as re,就搞定了

可以看到正则那块直接消失了。提升还是很大的。时间不早了,当天的优化就到此结束了。上线之后,积压一下子就下去了:

后记

要想调试的时候方便,在写代码的时候就要注意,尽量使自己的代码 mock-friendly 一点。如果需要引入外部的数据库、服务、API等等各种资源,最好有一个开关或者选项能够不加载外部资源,或者至少能够很方便地 mock 这些外部服务,这样方便对每一个小单元进行 profile。

总有人吐槽 Python 的性能低下,但是 Python 本来就不是做计算任务的呀,Python 是一门胶水语言,是用来写业务逻辑的,而不是用来写CPU密集的算法的。事实上复杂的解析一般都会用 C++ 这种硬核语言来写了,比如 numpy TensorFlow lxml。大多数程序员一天 90% 的工作除了和产品经理撕逼以外,也就是在写 CRUD,也就是调用这些包。所以瓶颈一般在 IO 上而不在 CPU 上,而解决 IO 的瓶颈手段就多了,Python 中至少有 多进程、多线程、AsyncIO、Gevent 等多种方法。不过方法多其实也是一个弊端,这几种方法可以说是基本互不兼容,对各种第三方库的支持也参差不齐。而 Go 在这方面就做地很好了,语言直接内置了 go 关键字,甚至都不支持多线程。所有的库都是支持一个统一的并发模型,对于使用者来说更简单。

Zen of Python 中有一句:There should be one way — preferably only one way — to do a thing. 这点上 Python 本身没有做到,反倒是 Go 实践地非常好。

扯远了,程序的瓶颈其实不外乎CPU、内存和 IO 三个方面,而 cProfile 和火焰图是判断 CPU 瓶颈的一把利器。

后面还发现了一些性能瓶颈,也列在这里:

  1. yaml 的反序列化时间过长。解决方法是添加了一个 Expiring LRU Cache,不要每次都去加载,当然牺牲的是一点点内存,以及当规则变更时会有一些延迟,不过都是可以接受的。之前早就听人说 Thrift 的序列化性能相比 Protobuf 太低,现在想想序列化和反序列化还真是一个很常见的性能瓶颈啊。

  2. 存储使用了 360 的 pika,pika 可以理解为一个基于 rocksdb 的硬盘版 redis。最开始的时候没多想,随便找了台机器搭了起开,把上面的问题解决之后,pika 的延迟很快大了起来,机器的监控也显示 IO 基本被打满了。这时候才发现原来这台机器没有用 SSD,果断换了 SSD 问题基本解决了。如果再有问题可能就需要集群了。

性能这个问题其实是典型的木桶理论的场景,系统的整体性能是由最差的一块决定的。所以也是一个不断迭代的过程。

祝大家新年快乐~

参考文献

  1. https://toucantoco.com/en/tech-blog/tech/python-performance-optimization
  2. https://docs.python.org/3.6/library/profile.html
  3. https://medium.com/build-smarter/blazing-fast-python-40a2b25b0495
  4. https://swtch.com/~rsc/regexp/regexp1.html