SQLAlchemy 2.0 教程

SQLAlchemy 非常优雅,虽然大多数时候可以认为它是一个 ORM 库,但是实际上它分为了两部分——底层的 Core 和上层的传统 ORM。在 Python 乃至其他语言的大多数 ORM 中,都没有实现很好的分离,比如 django 的 ORM,数据库链接和 ORM 本身完全是混在一起的。

为什么要有 Core

Core 层主要实现了客户端连接池的功能。我们知道,关系型数据库作为现代 Web 应用的核心,它的并发链接能力往往并不是很强,最好不要搞好多短链接过去,所以我们一般是需要一个连接池的。链接池大体分为两种,一种是服务端的,也就是一个专门的连接池中间件,把短链接每次分配一个长链接复用。另一种就是客户端的,一般作为程序的一部分实现。SQLAlchemy 的连接池显然是客户端连接池的一种。在这个连接池中,SQLAlchemy 维护了一定数量的链接,当你调用 connect 的时候,实际上是从池子中取出了一个链接,调用 close 的时候实际上是放回到了池子中一个链接。

创建链接

SQLAlcehmy 统一使用 create_engine 来创建链接(池)。create_engine 的参数是一个数据库的 URL。

from sqlalchemy import create_engine

engine = create_engine(
    "mysql://user:[email protected]:3306/dbname",
    echo=True,  # echo 设为 true 会打印出实际执行的 sql,调试的时候更方便
    future=True,  # 使用 2.0API,向后兼容
    pool_size=5, # 连接池的大小默认为 5 个,设置为 0 时表示连接无限制
    pool_recycle: 3600, # 设置时间以限制数据库多久没连接自动断开。
)

# 创建一个 SQLite 的内存数据库
engine = create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True)

# mysqlclient (a maintained fork of MySQL-Python)
# pip install mysqlclient
engine = create_engine('mysql+mysqldb://scott:[email protected]/foo?charset=utf8mb4')

直接使用 SQL

CRUD

from sqlachemy import text

with engine.connect() as conn:
    result = conn.execute(text("select * from users"))
    print(result.all())

# result 还可以遍历,每一个行结果是一个 Row 对象
for row in result:
    # row 对象三种访问方式都支持
    print(row.x, row.y)  
    print(row[0], row[1])
    print(row["x"], row["y"])

# 传递参数,使用 `:var` 传递
result = conn.execute(
...         text("SELECT x, y FROM some_table WHERE y > :y"),
...         {"y": 2}
...     )
# 也可以预先编译好参数
stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)

# 插入的时候比较牛逼,可以直接插入多条
conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 11, "y": 12}, {"x": 13, "y": 14}]
...     )

commit

sqlalchemy 提供两种提交的方式,一种是手工 commit,一种是半自动 commit。官方文档建议使用 engine.begin()。还有一种完全自动的,每一行提交一次的 autocommit 方式,不建议使用。

# "commit as you go"  需要手动 commit
>>> with engine.connect() as conn:
...     conn.execute(text("CREATE TABLE some_table (x int, y int)"))
...     conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 1, "y": 1}, {"x": 2, "y": 4}]
...     )
...     conn.commit()  # 注意这里的 commit

# "begin once"  半自动 commit
>>> with engine.begin() as conn:
...     conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 6, "y": 8}, {"x": 9, "y": 10}]
...     )

ORM

session 不是线程安全的。但是一般情况下,web 框架应该在每个请求开始的时候获得一个 session,所以也不是问题。

from sqlalchemy.orm import Session

with Session(engine) as session:
    session.add(foo)
    session.commit()

# 还可以使用 sessionmaker 来创建一个工厂函数,这样就不用每次都输入参数了

from sqlachemy.orm import sessionmaker
Session = sessionmaker(engine)

with Session() as session:
    ...

声明式 API

from datetime import datetime
from sqlalchemy.sql import func
from sqlalchemy import Integer, Column, String, 
from sqlalchemy.orm import declarative_base, relationship

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String(30))
    fullname = Column(String)
    # 对于特别大的字段,还可以使用 defer,这样默认不加载这个字段
    description = deferred(Column(Text))
    # 默认值,注意传递的是函数,不是现在的时间
    time_created = Column(DateTime(Timezone=True), default=datetime.now)
    # 或者使用服务器默认值,但是这个必须在表创建的时候就设置好
    time_created = Column(DateTime(timezone=True), server_default=func.now())
    time_updated = Column(DateTime(timezone=True), onupdate=func.now())

class Address(Base):
    __tablename__ = "address"
    id = Column(Integer, primary_key=True)
    email_address = Column(String, nullable=False)

# 调用 create_all 创建所有模型
Base.metadata.create_all(engine)

# 如果只需要创建一个模型
User.__table__.create(engine)

外键

多对一关系的双向映射

from sqlalchemy.orm import relationship

class Parent(Base):
    __tablename__ = "parents"
    id = Column(Integer, primary_key=True)
    # 这里指定了一个 children 属性,指向对应的 Child 对象们,back_ref 指定了在对应的对象中的属性名
    children = relationship("Child", backref="parent")

class Child(Base):
    __tablename__ = "children"
    id = Column(Integer, primary_key=True)
    # 这是数据库中存的字段,类型是 Foreign Key
    parent_id = Column(Integer, ForeignKey("parents.id"))

多对多映射

class Map(Base):
    __tablename__ = "map"
    parent_id = Column(Integer, ForeignKey("parents.id"))
    child_id = Column(Integer, ForeignKey("children.id"))

class Parent(Base):
    __tablename__ = "parents"
    id = Column(Integer, primary_key=True)
    children = relationship("Child", secondary=Map, backref="parents")

class Child(Base):
    __tablename__ = "children"
    id = Column(Integer, primary_key=True)

CRUD

和 1.x API 不用的是,2.0 API 中不再使用 query,而是使用 select 来查询数据。

from sqlalchemy import select

stmt = select(User).where(User.name == "john").order_by(User.id)
# order_by 还可以使用 User.id.desc() 表示逆序排列

result = session.execute(stmt)

# 一般情况下,当选取整个对象的时候,都要用 scalars 方法,否则返回的是一个包含一个对象的 tuple
for user in result.scalars():
    print(user.name)

result = session.execute(select(User.name))
for row in result:
    print(row.name)

# 添加对象直接使用 session.add 方法
session.add(user)

# 如果要获取插入后的 ID
session.flush()   # flush 并不是 commit,并没有提交事务,应该是可重复读,和数据库的隔离级别有关。
print(user.id)

# 删除使用 session.delete
session.delete(user)

# 更新数据需要使用 update 语句
from sqlalchemy import update
# synchronize_session 有三种选项: false, "fetch", "evaluate",默认是 evaluate
# false 表示完全不更新 Python 中的对象
# fetch 表示重新从数据库中加载一份对象
# evaluate 表示在更新数据库的同时,也尽量在 Python 中的对象上使用同样的操作
stmt = update(User).where(User.name == "john").values(name="John").execution_options(synchronize_session="fetch")
session.execute(stmt)

# 或者直接赋值
user.name = "John"
session.commit()

# 这里有一个可能引入 race condition(静态条件)的地方
user.visit_count += 1  # 错误!如果两个进程同时更新这个值,可能导致更新失败
# SQL:Update users set visit_count = 2 where user.id = 1
user.visit_count = User.visit_count + 1  # 注意大写的 U,也就是使用了模型的属性,生成的 SQL 是在 SQL 中 +1
# SQL: Update users set visit_count = visit_count + 1 where user.id = 1

加载外键的值

# 默认情况下,在查询中是不会加载外键的,我们可以使用 joinedload 选项来加载外键,从而避免 N+1 问题
session.execute(select(Child)).scalars().all()  # 没有加载 parent 外键
# 使用 joinedload 加载外键,注意需要使用 unique 方法,这是 2.0 中规定的。
session.execute(select(Child).options(joinedload(Child.parent))).unique().scalars().all()

在 2.0 中,更推荐使用 selectinload 而不是 joinedload,一般情况下,selectinload 都要好,而且不用使用 unique.
从原理上来说,joinedload 是通过 join 的方式加载了对应的数据。而 selectinload 是通过 select * from parents where id in … 来实现的。

session.execute(select(Child).options(selectinload(Child.parent))).scalars().all()

外键的写入

SQLAlchemy 中,直接像处理数组一样处理外键就好了,这点非常方便。

parent.children.append(child1)  # 添加
parent.children.remove(child2)  # 删除
parent.children.clear()
parent.children = []  

JSON 字段的特殊处理

大多数的数据库现在都支持 JSON 字段了,在 SQLAlchemy 中我们也可以直接从字段读取 json 对象或者写入 json 对象。
但是,要记住,千万不要直接对这个 json 对象做 update 并期望写回数据库中,这个是不可靠的。一定要复制后读写,然后在赋值回去。

article = session.get(Article, 1)
tags = copy.copy(article.tags)
tags.append("iOS")
article.tags = tags
session.commit()

批量插入

当需要插入大量数据的时候,如果依然采用逐个插入的方法,那么就会在和数据库的交互上浪费很多时间,效率很低。
MySQL 等大多数数据库都提供了 insert ... values (...), (...) ... 这个批量插入的 API,在 SQLAlchemy 中也可以很好地利用这一点。

# 使用 session.bulk_save_objects(...) 直接插入多个对象

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

# 使用 bulk_insert_mappings 可以省去创建对象的开销,直接插入字典
users = [
    {"name": "u1"},
    {"name": "u2"},
    {"name": "u3"},
]
s.bulk_insert_mappings(User, users)
s.commit()

# 使用 bulk_update_mappings 可以批量更新对象,字典中的 id 会被用作 where 条件,其他字段全部用于更新
session.bulk_update_mappings(User, users)

从 1.X API 迁移到 2.0 API

# session.query(User).get(42)
session.get(User, 42)

# session.query(User).all()
session.execute(select(User)).scalars().all()

# session.query(User).filter_by(name="some_user").one()
session.execute(select(User).filter_by(name="some_user")).scalar_one()

# session.query(User).from_statememt(text("select * from users")).a..()
session.execute(select(User).from_statement(text("selct * from users"))).scalars().all()

# session.query(User).filter(User.name == "foo").update({"fullname": "FooBar"}, synchronize_session="evaluate")
session.execute(update(User).where(User.name == "foo").values(fullname="FooBar").execute_options(synchronize_session="evaluate"))

反射——从数据库创建模型

在 FastAPI 中使用

参考

  1. https://docs.sqlalchemy.org/en/14/tutorial/ormrelatedobjects.html
  2. https://stackoverflow.com/questions/25668092/flask-sqlalchemy-many-to-many-insert-data
  3. https://stackoverflow.com/questions/9667138/how-to-update-sqlalchemy-row-entry
  4. https://docs.sqlalchemy.org/en/14/changelog/migration_20.html
  5. https://stackoverflow.com/questions/13370317/sqlalchemy-default-datetime
  6. https://amercader.net/blog/beware-of-json-fields-in-sqlalchemy/
  7. https://stackoverflow.com/questions/26948397/how-to-delete-records-from-many-to-many-secondary-table-in-sqlalchemy
  8. Count by many to many foreign key
  9. https://stackoverflow.com/questions/19175311/how-to-create-only-one-table-with-sqlalchemy/19175907
  10. https://stackoverflow.com/questions/63220132/sqlalchemy-insert-to-mysql-db-unicodeencodeerror-for-cyrlic-data

及时获取更新,请关注公众号“爬虫技术学习”(spider-learn)

多年大厂求职&面试官经验,简历付费优化,¥ 500/次。

公众号“爬虫技术学习(spider-learn)”

About 逸飞

后端工程师

发表评论

邮箱地址不会被公开。 必填项已用*标注