Month: 7月 2021

SQLAlchemy 2.0 教程

SQLAlchemy 非常优雅,虽然大多数时候可以认为它是一个 ORM 库,但是实际上它分为了两部分——底层的 Core 和上层的传统 ORM。在 Python 乃至其他语言的大多数 ORM 中,都没有实现很好的分离,比如 django 的 ORM,数据库链接和 ORM 本身完全是混在一起的。

为什么要有 Core

Core 层主要实现了客户端连接池的功能。我们知道,关系型数据库作为现代 Web 应用的核心,它的并发链接能力往往并不是很强,最好不要搞好多短链接过去,所以我们一般是需要一个连接池的。链接池大体分为两种,一种是服务端的,也就是一个专门的连接池中间件,把短链接每次分配一个长链接复用。另一种就是客户端的,一般作为程序的一部分实现。SQLAlchemy 的连接池显然是客户端连接池的一种。在这个连接池中,SQLAlchemy 维护了一定数量的链接,当你调用 connect 的时候,实际上是从池子中取出了一个链接,调用 close 的时候实际上是放回到了池子中一个链接。

创建链接

SQLAlcehmy 统一使用 create_engine 来创建链接(池)。create_engine 的参数是一个数据库的 URL。

from sqlalchemy import create_engine

engine = create_engine(
    "mysql://user:[email protected]:3306/dbname",
    echo=True,  # echo 设为 true 会打印出实际执行的 sql,调试的时候更方便
    future=True,  # 使用 2.0API,向后兼容
    pool_size=5, # 连接池的大小默认为 5 个,设置为 0 时表示连接无限制
    pool_recycle: 3600, # 设置时间以限制数据库多久没连接自动断开。
)

# 创建一个 SQLite 的内存数据库
engine = create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True)

# mysqlclient (a maintained fork of MySQL-Python)
# pip install mysqlclient
engine = create_engine('mysql+mysqldb://scott:[email protected]/foo?charset=utf8mb4')

直接使用 SQL

CRUD

from sqlachemy import text

with engine.connect() as conn:
    result = conn.execute(text("select * from users"))
    print(result.all())

# result 还可以遍历,每一个行结果是一个 Row 对象
for row in result:
    # row 对象三种访问方式都支持
    print(row.x, row.y)  
    print(row[0], row[1])
    print(row["x"], row["y"])

# 传递参数,使用 `:var` 传递
result = conn.execute(
...         text("SELECT x, y FROM some_table WHERE y > :y"),
...         {"y": 2}
...     )
# 也可以预先编译好参数
stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)

# 插入的时候比较牛逼,可以直接插入多条
conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 11, "y": 12}, {"x": 13, "y": 14}]
...     )

commit

sqlalchemy 提供两种提交的方式,一种是手工 commit,一种是半自动 commit。官方文档建议使用 engine.begin()。还有一种完全自动的,每一行提交一次的 autocommit 方式,不建议使用。

# "commit as you go"  需要手动 commit
>>> with engine.connect() as conn:
...     conn.execute(text("CREATE TABLE some_table (x int, y int)"))
...     conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 1, "y": 1}, {"x": 2, "y": 4}]
...     )
...     conn.commit()  # 注意这里的 commit

# "begin once"  半自动 commit
>>> with engine.begin() as conn:
...     conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 6, "y": 8}, {"x": 9, "y": 10}]
...     )

ORM

session 不是线程安全的。但是一般情况下,web 框架应该在每个请求开始的时候获得一个 session,所以也不是问题。

from sqlalchemy.orm import Session

with Session(engine) as session:
    session.add(foo)
    session.commit()

# 还可以使用 sessionmaker 来创建一个工厂函数,这样就不用每次都输入参数了

from sqlachemy.orm import sessionmaker
Session = sessionmaker(engine)

with Session() as session:
    ...

声明式 API

from datetime import datetime
from sqlalchemy.sql import func
from sqlalchemy import Integer, Column, String, 
from sqlalchemy.orm import declarative_base, relationship

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String(30))
    fullname = Column(String)
    # 对于特别大的字段,还可以使用 defer,这样默认不加载这个字段
    description = deferred(Column(Text))
    # 默认值,注意传递的是函数,不是现在的时间
    time_created = Column(DateTime(Timezone=True), default=datetime.now)
    # 或者使用服务器默认值,但是这个必须在表创建的时候就设置好
    time_created = Column(DateTime(timezone=True), server_default=func.now())
    time_updated = Column(DateTime(timezone=True), onupdate=func.now())

class Address(Base):
    __tablename__ = "address"
    id = Column(Integer, primary_key=True)
    email_address = Column(String, nullable=False)

# 调用 create_all 创建所有模型
Base.metadata.create_all(engine)

# 如果只需要创建一个模型
User.__table__.create(engine)

外键

多对一关系的双向映射

from sqlalchemy.orm import relationship

class Parent(Base):
    __tablename__ = "parents"
    id = Column(Integer, primary_key=True)
    # 这里指定了一个 children 属性,指向对应的 Child 对象们,back_ref 指定了在对应的对象中的属性名
    children = relationship("Child", backref="parent")

class Child(Base):
    __tablename__ = "children"
    id = Column(Integer, primary_key=True)
    # 这是数据库中存的字段,类型是 Foreign Key
    parent_id = Column(Integer, ForeignKey("parents.id"))

多对多映射

class Map(Base):
    __tablename__ = "map"
    parent_id = Column(Integer, ForeignKey("parents.id"))
    child_id = Column(Integer, ForeignKey("children.id"))

class Parent(Base):
    __tablename__ = "parents"
    id = Column(Integer, primary_key=True)
    children = relationship("Child", secondary=Map, backref="parents")

class Child(Base):
    __tablename__ = "children"
    id = Column(Integer, primary_key=True)

CRUD

和 1.x API 不用的是,2.0 API 中不再使用 query,而是使用 select 来查询数据。

from sqlalchemy import select

stmt = select(User).where(User.name == "john").order_by(User.id)
# order_by 还可以使用 User.id.desc() 表示逆序排列

result = session.execute(stmt)

# 一般情况下,当选取整个对象的时候,都要用 scalars 方法,否则返回的是一个包含一个对象的 tuple
for user in result.scalars():
    print(user.name)

result = session.execute(select(User.name))
for row in result:
    print(row.name)

# 添加对象直接使用 session.add 方法
session.add(user)

# 如果要获取插入后的 ID
session.flush()   # flush 并不是 commit,并没有提交事务,应该是可重复读,和数据库的隔离级别有关。
print(user.id)

# 删除使用 session.delete
session.delete(user)

# 更新数据需要使用 update 语句
from sqlalchemy import update
# synchronize_session 有三种选项: false, "fetch", "evaluate",默认是 evaluate
# false 表示完全不更新 Python 中的对象
# fetch 表示重新从数据库中加载一份对象
# evaluate 表示在更新数据库的同时,也尽量在 Python 中的对象上使用同样的操作
stmt = update(User).where(User.name == "john").values(name="John").execution_options(synchronize_session="fetch")
session.execute(stmt)

# 或者直接赋值
user.name = "John"
session.commit()

# 这里有一个可能引入 race condition(静态条件)的地方
user.visit_count += 1  # 错误!如果两个进程同时更新这个值,可能导致更新失败
# SQL:Update users set visit_count = 2 where user.id = 1
user.visit_count = User.visit_count + 1  # 注意大写的 U,也就是使用了模型的属性,生成的 SQL 是在 SQL 中 +1
# SQL: Update users set visit_count = visit_count + 1 where user.id = 1

加载外键的值

# 默认情况下,在查询中是不会加载外键的,我们可以使用 joinedload 选项来加载外键,从而避免 N+1 问题
session.execute(select(Child)).scalars().all()  # 没有加载 parent 外键
# 使用 joinedload 加载外键,注意需要使用 unique 方法,这是 2.0 中规定的。
session.execute(select(Child).options(joinedload(Child.parent))).unique().scalars().all()

在 2.0 中,更推荐使用 selectinload 而不是 joinedload,一般情况下,selectinload 都要好,而且不用使用 unique.
从原理上来说,joinedload 是通过 join 的方式加载了对应的数据。而 selectinload 是通过 select * from parents where id in … 来实现的。

session.execute(select(Child).options(selectinload(Child.parent))).scalars().all()

外键的写入

SQLAlchemy 中,直接像处理数组一样处理外键就好了,这点非常方便。

parent.children.append(child1)  # 添加
parent.children.remove(child2)  # 删除
parent.children.clear()
parent.children = []  

JSON 字段的特殊处理

大多数的数据库现在都支持 JSON 字段了,在 SQLAlchemy 中我们也可以直接从字段读取 json 对象或者写入 json 对象。
但是,要记住,千万不要直接对这个 json 对象做 update 并期望写回数据库中,这个是不可靠的。一定要复制后读写,然后在赋值回去。

article = session.get(Article, 1)
tags = copy.copy(article.tags)
tags.append("iOS")
article.tags = tags
session.commit()

批量插入

当需要插入大量数据的时候,如果依然采用逐个插入的方法,那么就会在和数据库的交互上浪费很多时间,效率很低。
MySQL 等大多数数据库都提供了 insert ... values (...), (...) ... 这个批量插入的 API,在 SQLAlchemy 中也可以很好地利用这一点。

# 使用 session.bulk_save_objects(...) 直接插入多个对象

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

# 使用 bulk_insert_mappings 可以省去创建对象的开销,直接插入字典
users = [
    {"name": "u1"},
    {"name": "u2"},
    {"name": "u3"},
]
s.bulk_insert_mappings(User, users)
s.commit()

# 使用 bulk_update_mappings 可以批量更新对象,字典中的 id 会被用作 where 条件,其他字段全部用于更新
session.bulk_update_mappings(User, users)

从 1.X API 迁移到 2.0 API

# session.query(User).get(42)
session.get(User, 42)

# session.query(User).all()
session.execute(select(User)).scalars().all()

# session.query(User).filter_by(name="some_user").one()
session.execute(select(User).filter_by(name="some_user")).scalar_one()

# session.query(User).from_statememt(text("select * from users")).a..()
session.execute(select(User).from_statement(text("selct * from users"))).scalars().all()

# session.query(User).filter(User.name == "foo").update({"fullname": "FooBar"}, synchronize_session="evaluate")
session.execute(update(User).where(User.name == "foo").values(fullname="FooBar").execute_options(synchronize_session="evaluate"))

反射——从数据库创建模型

在 FastAPI 中使用

参考

  1. https://docs.sqlalchemy.org/en/14/tutorial/ormrelatedobjects.html
  2. https://stackoverflow.com/questions/25668092/flask-sqlalchemy-many-to-many-insert-data
  3. https://stackoverflow.com/questions/9667138/how-to-update-sqlalchemy-row-entry
  4. https://docs.sqlalchemy.org/en/14/changelog/migration_20.html
  5. https://stackoverflow.com/questions/13370317/sqlalchemy-default-datetime
  6. https://amercader.net/blog/beware-of-json-fields-in-sqlalchemy/
  7. https://stackoverflow.com/questions/26948397/how-to-delete-records-from-many-to-many-secondary-table-in-sqlalchemy
  8. Count by many to many foreign key
  9. https://stackoverflow.com/questions/19175311/how-to-create-only-one-table-with-sqlalchemy/19175907
  10. https://stackoverflow.com/questions/63220132/sqlalchemy-insert-to-mysql-db-unicodeencodeerror-for-cyrlic-data

写一个 CRUD 还挺难的

让我们只从后端角度出发,考虑写一个简单的博客系统会有哪些问题。这篇文章谈论的并不是某个 Web 框架的 TODO list demo 之类的东西,那都是玩具性质的,而是会谈一谈生产环境中的要考虑的一些实际问题。本文中,我们也不会涉及到像是 MySQL 的几种隔离模式或者是 Kafka 是不是 Exactly Once 这种后端面试常问的八股文,而是从全局考虑一些简单但是又避不开的繁琐问题。

数据库

首先,需要定义一下数据库的表吧。在博客中,我们至少需要两个表:

  1. articles
  2. users

其中 articles 表中应该有一个 author_id 字段关联到 users 表中。那么问题来了,要不要在数据库层面定义物理外键呢?还是仅仅从逻辑上把两者关联起来。

使用物理外键的好处是可以少处理很多异常情况,因为数据库层面已经帮你解决了。但是在开发阶段,当需要删除数据的时候会有很繁琐的依赖问题。在生产环境中,外键也可能带来一些写入的性能问题。

如果只使用逻辑外键呢?那么就需要经常考虑有非法外键的情况,代码中要做好处理,不然就异常满天飞了。

关于数据库的第二个问题,要不要使用 ORM 呢?或许你已经习惯了使用 SQLAlchemy 或是 Hibernate 这类 ORM,并且认为这就是最佳实践。实际上,ORM 并不一定是最好的选择,选择 ORM 可能会有三个缺点:

  1. 意味着你在 SQL 语法之外还要另外学习一门 ORM 中包含的 DSL;
  2. ORM 也不能覆盖所有的 SQL 语句,很多时候你还是得手写 SQL;
  3. ORM 生成的 SQL 经常性能有问题,比如说经常就不小心 N+1 问题或者 select * 了。

甚至于,有的人认为 ORM 技术就是一团泥潭,堪比计算机界的越南战争(美国视角)

当然,不用 ORM 问题也很多,手工拼接 SQL 非常恶心不说,还容易出现 SQL 注入攻击的漏洞。

安全漏洞

正如刚刚提到的 SQL 注入攻击一样,作为一个生产环境的应用,面向大众开放,自然要考虑一些恶意攻击者的访问。

举个例子,你为了方便前端调用,在 API 中留下了一个 order_by=xxx 的参数,为了开发更灵活,这个参数直接对应到了数据库的字段而没有过滤。正常情况下,在你的客户端或者前端代码中,只是简单调用了一下 order_by=create_time,而这个字段是加了索引的,那么皆大欢喜。但是,恶意攻击者可不是这么想的,『我干嘛不调用一下没有索引的字段呢?』。比如说,发送大量的攻击请求到 order_by=first_name 上,那么很快你的数据库就可能被慢查询拖垮了。

这个例子蕴含了一个普遍的道理:灵活和安全不可得兼,你必须针对你的应用,选择一个恰当的地方做好折中。

上面的例子还没有涉及到数据,只是把网站搞挂了而已。另一个容易产生漏洞的地方,就是权限管理了。

假设你有一个用户更新自己文章的 API。那么就应该在其中校验用户传过来的 article_id 是不是他自己的文章。前端校验是不够的,假如恶意用户构造一个请求呢?如果不校验,就可能导致任意修改他人文章的漏洞。再比如,文章状态可能包含草稿和已发布,那么应该只有已发布的文章才可以浏览,但是用户也应该能预览和编辑自己未发布的草稿,你都需要判断是否有适当的权限。问题还可以变得更复杂一点,用户可能分为普通用户和管理员用户,管理员用户又可以查看所有的文章。在稍微大型一点的应用中,文章可能有不同的属性分类,用户更是可能有不同的角色,比如编辑、审核、管理员等等,文章的展现形式也可能多种多样,比如说是完全不可见,还是仅列表隐藏,还是仅列表可见标题,实际访问才会提示不可见/付费内容呢?

后台系统

前面提到了管理员用户,那么就引入了又一个问题——后台管理系统。对于一个博客系统的普通浏览者,看到的可能就是一篇篇的博客,但是对于博客的作者或者管理员来说,一定还有一个后台管理系统来写博客。后台管理系统需要有权限控制,对普通作者,需要限制访问后台的权限,比如说只能访问写作模块,对于管理员,则可以访问用户管理模块。一般来说,后台系统是一个复杂度不亚于前台的部分,这工作量一下就 double 了。现实中的应用,比如说新浪博客,可能还要复杂一些,至少分为三个部分。普通用户看到的博客页面,这部分在水面上,大家都能看到;博主撰写文章,管理自己文章的个人后台;而在新浪博客内部,一定还有一个审核文章,管理用户的内部后台,这工作量一下子就 triple 了。

你可能会说,django admin 这种自动生成的后台系统它不香么?等你尝试着添加一些 JS/CSS 或者是更改一下权限系统就知道了。有人专程写文章批判过:CRUD 代码生成的反模式典范:django admin

性能问题

等你把一切业务功能性的开发都搞定了,是时候考虑性能了。还是以博客系统为例,假如你有一个点赞的功能,那么根据学校里面教的数据库范式,这是一个典型的多对多关系啊,应该弄一个关联表,大概像这样:

users(n) <-> user_liked_articles <-> articles(n)

问题来了,如果首页要展现一个用户最喜欢的文章列表怎么办?在首页上有一个三个表的 join 操作对性能的影响是可想而知的。这时候至少有两种思路:

  1. 放弃正交化,在 article 表中增加一个 num_likes 字段,这样直接只查一个表就出来结果啦。缺点是要在代码中做好冗余信息的同步。
  2. 增加一个缓存,可以缓存整个首页,也可以缓存这条 SQL 的查询结果。缺点是你又得多一个组件,而且查询结果是有延迟的。

这已经是简单到不能再简单的一个小小性能问题了,真正的性能瓶颈可能需要通过使用 profile 工具或者是 wrk 这类的压测工具才能找出并修复。

部署

当你把性能问题也解决差不多的时候,终于到部署了。相对来说,部署还算中规中矩,没有多少幺蛾子。但是至少也要涉及到配置 nginx,申请 SSL 证书这些方面。

在 2021 年的今天,肯定得上 docker 吧,那还得写 dockerfile。如果进一步,要用 k8s 的话,还得写 deployments。或许你还需要配置一下 Jenkins 或者其他 CI 工具……

另外,静态文件怎么放也得考虑一下,就不说 CDN 或者优化图片大小了。至少开发时候你存放的目录得挪到 nginx 对应的目录吧?都是不疼不痒但是必须考虑的杂活儿。

结语

大概写到这里吧,还有好多问题没有涉及,包括但不限于:

  1. 搜索。总不能用 select * like %keyword% 吧?如果上 ES 的话,ES 的查询语法大概也得了解吧,还得考虑到配置从 SQL 数据库到 ES 的同步问题。
  2. 监控和日志。从两个方面来说,一是程序的性能监控,服务挂了得即使知道。另一方面是业务数据的监控,比如说 DAU 是多少。
  3. 测试。代码的单元测试,集成测试等等。
  4. 异步消息。比如说刚刚提到的点赞,被点赞的用户要不要收到一个通知呢?是否要发送邮件通知?邮件通知是不是该搞个消息队列异步操作?redis/kafka?又引入了一个新系统。
  5. 反爬。前面提到的恶意攻击都是以破坏数据为目的的。爬虫稍微好点,只是想把你的数据(全部)搞走。或许你会愿意搜索引擎赶紧收录你的博客文章,但是肯定不希望一个竞争对手把你的所有用户信息全都爬走,然后挨个邀请过去。举两个在反爬上很简单的错误:
    1. 暴露数据库的物理 id。假如你的数据库直接用的自增 id,并且把这个 id 暴露在 API 中,那好了,我直接遍历就完了,所有用户信息拿到。
    2. 没有频控。最起码也要针对 IP 做一个漏桶或者令牌桶的频控吧,不然爬虫流量消耗服务器资源都是很大的问题。

看到这里,希望你对一个系统的复杂和琐碎性能有一个大体的印象,就不要再问出

  1. 10 万块钱能不能做个淘宝?
  2. 4 个月能不能山寨个抖音?
  3. 不就是加个字段么,为啥还要排期?

这种奇葩问题了。总体来说,以上没有什么技术难点。但是每一个点都需要做出取舍折中,特别琐碎。要把每一个小事都考虑好了,还挺复杂的,工作量也不小,而且也不一定是一个人能搞定的。以上所有的这些还都只是后端的问题,另一半前端的问题还完全没有考虑……

对于前端,使用 React 还是 Vue。考虑到 SEO 的话,哪些页面还需要做静态化,这些都是需要考虑的众多问题之一……

最后,或许你还是看不起一个简单的 CRUD 的 web 应用。什么大数据啊、深度学习啊、高并发才是应该掌握的知识嘛!但是,要知道互联网的根基或者说起点从来都是一个简单的后端+前端。你首先做出一个有用的东西,有了用户,慢慢才会产生大数据,才会有高并发的需求。

本文像之前写的「爬虫思路」那篇文章一样,纯意识流瞎写,没有什么架构图,也没有任何参考文章。或许过几个月,我还会写一篇关于前端或者机器学习的文章吧。