一分钟学一个 Python 标准库之 Pathlib

相对于 os.path 来说,Pathlib 极大地简化了路径相关的操作。举个例子来说:

获取当前文件路径下的 default.yaml 文件

import os

os.path.join(os.path.dirname(__file__), "default.yaml")

只需要:

from pathlib import Path

Path(__file__).parent / "default.yaml"

不难看出,pathlib 有几个优点:

  1. 字符少了很多,出 bug 的地方就少了很多;
  2. 和操作系统一样,pathlib 直接使用路径分隔符 / 来操作,而不是 join 的两个参数;
  3. 非常符合直觉,从左到右阅读。而 os.path 使用函数参数,需要来回跳跃理解。相比之下,Pathlib 阅读起来很清晰。

Python3 标准库中所有接受 str 作为路径参数的地方,现在都可以接受一个 path 对象了。如果你使用的第三方库只接受 str 作为参数,
可以通过 path_str = str(path) 来转化一下。

Pathlib 的其他常用方法:

from pathlib import Path

cwd = Path.cwd()  # 获取当前目录
home = Path.home()  # 获取家目录,比如 /home/ubuntu
path = Path("/home/yifei")  # 创建一个新的 path 对象

path.is_dir()  # 是否是目录
path.is_file()  # 是否是普通文件
path.exists()  # 路径是否存在
path.resolve()  # 解析成绝对路径,比如 Path(".").resolve() 相当于 Path.cwd()

path.mkdir(parents=True, exists_ok=True) # 类似 mkdir -p

# 遍历目录
for child in path.iterdir():
    print(child)

# 除此之外,还有几个很甜的方法,省去了 with open 语句
path = Path("/home/ubuntu/readme.txt")
text = path.read_text()
path.write_text(text)

path = Path("/home/ubuntu/image.png")
image = path.read_bytes()
path.write_bytes(image)

>>> path
PosixPath('/home/ubuntu/test.md')
>>> path.name
'test.md'
>>> path.stem
'test'
>>> path.suffix
'.md'
>>> path.parent
PosixPath('/home/ubuntu')
>>> path.parent.parent
PosixPath('/home')
>>> path.anchor
'/'

在这个注意力涣散的年代,想要经常写一些长篇大论的帖子实在太难了,可能一个月才能憋出一篇。
不如分享写简单但是有用的知识点,或许还能经常更新。

以上就是全部内容啦。

参考

  1. https://realpython.com/python-pathlib/
  2. https://docs.python.org/3/library/pathlib.html

爬虫数据存储的一些选择

爬虫是一种典型的「读少写多」的应用场景。而且爬虫产生的数据一般是作为离线数据,而不是在线数据。
也就是说这些数据主要是用于离线数据分析,而不是直接供线上用户查询使用。
即使线上需要使用爬虫数据,也会需要经过清洗处理过后再放到在线库。总之,不会直接供线上直接使用。

本文总结了一些经验和踩过坑,希望能对读者有帮助。

写入通用原则

  1. 要批量写,不要挨个写
  2. 要顺序写,不要随机写

如果我们开了一个多线程的爬虫,然后每个线程每爬到一条数据就调一下 db.insert(item) 插入数据,数据一多就是灾难性的。

首先,每个线程持有一个数据库链接对数据库的负载就产生了不小压力。其次,每条数据都去调用数据库,那么每次插入时间都得
加上数据库的往返时间,也就是 2RTT(round trip time)。再者,每次插入的都是不同的数据,可能在磁盘的不同位置,导致
磁盘的写入时间大部分都花在寻道上了,磁盘 IO 时间会大幅提升。最后,如果是 SQL 型的数据库,默认配置下,
可能还会有事务的影响。

正确的做法是——用队列。不同的线程都把数据先发到一个队列中,不管是你用的语言自带的内存里的 Queue,还是 redis list,
或者是 kafka,都可以。然后由另一个线程或者脚本读取这个队列,把数据整理之后,定期或者定量写入数据库。

这样做基本上解决了上面提到的每个问题。只有存储线程持有数据库链接,每一条数据不会在需要 2RTT,摊薄下来可能是 2RTT / 1000,
数据经过整理后,每次可以都插入统一中类型的数据,磁盘不需要总是在寻道。

当然,这种方案也会引入一些新的问题,需要注意解决:

  1. 处理流程变成了异步,不能实时在数据中看到最新的爬取结果
  2. 多了消息队列的环节,多了丢数据的可能
  3. 如果消息队列是内存性的,不要让消息队列爆了

以上这些问题都是使用 MQ 的常见问题了,这里不再展开。

数据库选型

大概有这些选择:

  1. CSV、JSON、SQLite 或者其他单机文件
  2. SQL 数据库
  3. MongoDB 等文档数据库
  4. HBase 等 Hadoop 生态圈存储
  5. S3 类型对象存储
  6. Kafka 等持久性消息队列

如果你只是简单地「单线程」爬几页数据分析用,那么存个 CSV 或者 JSON 就可以了。如果你开始上多线程,甚至多机了,
既要考虑写入的时候加锁,而且没法分布式写入,单文件存储就不太合适了。

规模再大一点可以考虑 MySQL。虽然 MySQL 是一个 OLTP 数据库,通常意义上来说更适用于线上数据库。但是对于数据量不大的爬虫来说,
比如说总数据量不会超过 100GiB,也已经足够用了。而且 MySQL 可以添加 unique 索引,一定程度上还能帮助解决数据去重的问题。
这里有几点需要注意:

  1. 使用 ORM 创建的表中包含了不少外键约束之类的东西,对于爬来的数据,中间插入的时候可能还不满足这个外键,最好把这个约束删除掉
  2. 2.

有些人喜欢用 MongoDB 来存储爬虫数据,他们给出的理由也很有吸引力——爬虫数据多是半结构化的,而且数据结构可能经常跟着源网站变,
用 MongoDB 这种 schemaless 的文档数据库再合适不过了。然而我个人非常不推荐用 MongoDB,原因如下:

  1. 我觉得定义好表结构不是一个缺点,反而是一个优点,这样能够在开发调试阶段就发现各种异常情况,保证程序稳定
  2. MySQL 的图形化客户端太多了,比如说 Navicat,Sequel Pro 等等。对于小公司来说,这个客户端就已经够用了,根本不需要开发什么单独的管理后台
    相反,MongoDB 基本没有什么特别好用的客户端
  3. MySQL 和 Postgres 也早就支持了 JSON 字段,实在不是特别规整的数据,存在 JSON 字段就行了
  4. 数据分析的第三方库,比如 pandas,对 SQL 的支持都是原生的,一个 read_sql 就把数据读出来了

数据再多一些,或者并发量再大一些的话,可能单独使用 MySQL 就不合适了。这时候你可以对 MySQL 做定期归档,比如说把添加时间在一个月以上的数据
都按日期写入到 Hive 或者 S3 中,然后删除掉 MySQL 中的数据。这样的做法,其实相当于隐式地把 MySQL 作为了一个消息队列,并起到了缓冲的作用。

再者,MySQL 这种毕竟是行式数据库,如果你的数据数值居多,也可以跳过 MySQL,考虑直接存储到列式数据库中。下游的 Spark,Flink 这些消费端可能更喜欢读取列式数据。

最后一种选项是直接使用 Kafka 这种持久化的消息队列作为存储。DDIA 这本书中提到一个有趣的观点:数据库是日志的积分,日志是数据库的导数。
从某种意义上来说,两者所含有的信息是等价的,可以相互转换。所以直接使用消息队列作为数据存储也未尝不可。

总之,对于爬虫这种场景来说,最重要的特点是「读少写多」,按照这个思路去选择问题不大。除了这里提到的一些数据库,还有 Cassandra,FoundationDB 等一些
数据库没有提到,在特定的场景下也都值得考虑。对于存储的选择也不只是一个技术问题,可能更重要的是你的公司现在有什么,选一个比较合适的用就好了。

参考

  1. https://www.zhihu.com/question/479761564
  2. https://www.zhihu.com/question/36110917

Kubernetes 定时任务

参照官方文档,直接比着写就行了:

apiVersion: batch/v1
kind: CronJob
metadata:
  name: hello
spec:
  schedule: "*/1 * * * *"  # 也支持 @hourly 这些语法
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: hello
            image: busybox
            imagePullPolicy: IfNotPresent
            command:
            - /bin/sh
            - -c
            - date; echo Hello from the Kubernetes cluster
          restartPolicy: OnFailure

列出所有 cronjob

kubectl get cronjob [JOB_NAME]

NAME    SCHEDULE      SUSPEND   ACTIVE   LAST SCHEDULE   AGE
hello   */1 * * * *   False     0        <none>          10s

列出 cronjob 所有运行过的实例:

kubectl get jobs --watch

NAME               COMPLETIONS   DURATION   AGE
hello-4111706356   0/1                      0s
hello-4111706356   0/1           0s         0s
hello-4111706356   1/1           5s         5s

需要注意的一点,也是 Linux 上的 cron 命令忽视的一点,K8s 提供了 .spec.concurrentPolicy 选项,
用来选择当上一任务还没有执行完毕时,如何处理并发。

  • Allow,默认选项,只要到了时间就触发,并发执行
  • Forbid,如果上一个任务没有执行完毕,忽略本次任务
  • Replace,如果上一个任务没有执行完毕,用新任务替换掉老任务

参考

  1. https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/

SQLAlchemy 2.0 教程

SQLAlchemy 非常优雅,虽然大多数时候可以认为它是一个 ORM 库,但是实际上它分为了两部分——底层的 Core 和上层的传统 ORM。在 Python 乃至其他语言的大多数 ORM 中,都没有实现很好的分离,比如 django 的 ORM,数据库链接和 ORM 本身完全是混在一起的。

为什么要有 Core

Core 层主要实现了客户端连接池的功能。我们知道,关系型数据库作为现代 Web 应用的核心,它的并发链接能力往往并不是很强,最好不要搞好多短链接过去,所以我们一般是需要一个连接池的。链接池大体分为两种,一种是服务端的,也就是一个专门的连接池中间件,把短链接每次分配一个长链接复用。另一种就是客户端的,一般作为程序的一部分实现。SQLAlchemy 的连接池显然是客户端连接池的一种。在这个连接池中,SQLAlchemy 维护了一定数量的链接,当你调用 connect 的时候,实际上是从池子中取出了一个链接,调用 close 的时候实际上是放回到了池子中一个链接。

创建链接

SQLAlcehmy 统一使用 create_engine 来创建链接(池)。create_engine 的参数是一个数据库的 URL。

from sqlalchemy import create_engine

engine = create_engine(
    "mysql://user:[email protected]:3306/dbname",
    echo=True,  # echo 设为 true 会打印出实际执行的 sql,调试的时候更方便
    future=True,  # 使用 2.0API,向后兼容
    pool_size=5, # 连接池的大小默认为 5 个,设置为 0 时表示连接无限制
    pool_recycle: 3600, # 设置时间以限制数据库多久没连接自动断开。
)

# 创建一个 SQLite 的内存数据库
engine = create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True)

# mysqlclient (a maintained fork of MySQL-Python)
# pip install mysqlclient
engine = create_engine('mysql+mysqldb://scott:[email protected]/foo?charset=utf8mb4')

直接使用 SQL

CRUD

from sqlachemy import text

with engine.connect() as conn:
    result = conn.execute(text("select * from users"))
    print(result.all())

# result 还可以遍历,每一个行结果是一个 Row 对象
for row in result:
    # row 对象三种访问方式都支持
    print(row.x, row.y)  
    print(row[0], row[1])
    print(row["x"], row["y"])

# 传递参数,使用 `:var` 传递
result = conn.execute(
...         text("SELECT x, y FROM some_table WHERE y > :y"),
...         {"y": 2}
...     )
# 也可以预先编译好参数
stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)

# 插入的时候比较牛逼,可以直接插入多条
conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 11, "y": 12}, {"x": 13, "y": 14}]
...     )

commit

sqlalchemy 提供两种提交的方式,一种是手工 commit,一种是半自动 commit。官方文档建议使用 engine.begin()。还有一种完全自动的,每一行提交一次的 autocommit 方式,不建议使用。

# "commit as you go"  需要手动 commit
>>> with engine.connect() as conn:
...     conn.execute(text("CREATE TABLE some_table (x int, y int)"))
...     conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 1, "y": 1}, {"x": 2, "y": 4}]
...     )
...     conn.commit()  # 注意这里的 commit

# "begin once"  半自动 commit
>>> with engine.begin() as conn:
...     conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 6, "y": 8}, {"x": 9, "y": 10}]
...     )

ORM

session 不是线程安全的。但是一般情况下,web 框架应该在每个请求开始的时候获得一个 session,所以也不是问题。

from sqlalchemy.orm import Session

with Session(engine) as session:
    session.add(foo)
    session.commit()

# 还可以使用 sessionmaker 来创建一个工厂函数,这样就不用每次都输入参数了

from sqlachemy.orm import sessionmaker
Session = sessionmaker(engine)

with Session() as session:
    ...

声明式 API

from datetime import datetime
from sqlalchemy.sql import func
from sqlalchemy import Integer, Column, String, 
from sqlalchemy.orm import declarative_base, relationship

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String(30))
    fullname = Column(String)
    # 对于特别大的字段,还可以使用 defer,这样默认不加载这个字段
    description = deferred(Column(Text))
    # 默认值,注意传递的是函数,不是现在的时间
    time_created = Column(DateTime(Timezone=True), default=datetime.now)
    # 或者使用服务器默认值,但是这个必须在表创建的时候就设置好
    time_created = Column(DateTime(timezone=True), server_default=func.now())
    time_updated = Column(DateTime(timezone=True), onupdate=func.now())

class Address(Base):
    __tablename__ = "address"
    id = Column(Integer, primary_key=True)
    email_address = Column(String, nullable=False)

# 调用 create_all 创建所有模型
Base.metadata.create_all(engine)

# 如果只需要创建一个模型
User.__table__.create(engine)

外键

多对一关系的双向映射

from sqlalchemy.orm import relationship

class Parent(Base):
    __tablename__ = "parents"
    id = Column(Integer, primary_key=True)
    # 这里指定了一个 children 属性,指向对应的 Child 对象们,back_ref 指定了在对应的对象中的属性名
    children = relationship("Child", backref="parent")

class Child(Base):
    __tablename__ = "children"
    id = Column(Integer, primary_key=True)
    # 这是数据库中存的字段,类型是 Foreign Key
    parent_id = Column(Integer, ForeignKey("parents.id"))

多对多映射

class Map(Base):
    __tablename__ = "map"
    parent_id = Column(Integer, ForeignKey("parents.id"))
    child_id = Column(Integer, ForeignKey("children.id"))

class Parent(Base):
    __tablename__ = "parents"
    id = Column(Integer, primary_key=True)
    children = relationship("Child", secondary=Map, backref="parents")

class Child(Base):
    __tablename__ = "children"
    id = Column(Integer, primary_key=True)

CRUD

和 1.x API 不用的是,2.0 API 中不再使用 query,而是使用 select 来查询数据。

from sqlalchemy import select

stmt = select(User).where(User.name == "john").order_by(User.id)
# order_by 还可以使用 User.id.desc() 表示逆序排列

result = session.execute(stmt)

# 一般情况下,当选取整个对象的时候,都要用 scalars 方法,否则返回的是一个包含一个对象的 tuple
for user in result.scalars():
    print(user.name)

result = session.execute(select(User.name))
for row in result:
    print(row.name)

# 添加对象直接使用 session.add 方法
session.add(user)

# 如果要获取插入后的 ID
session.flush()   # flush 并不是 commit,并没有提交事务,应该是可重复读,和数据库的隔离级别有关。
print(user.id)

# 删除使用 session.delete
session.delete(user)

# 更新数据需要使用 update 语句
from sqlalchemy import update
# synchronize_session 有三种选项: false, "fetch", "evaluate",默认是 evaluate
# false 表示完全不更新 Python 中的对象
# fetch 表示重新从数据库中加载一份对象
# evaluate 表示在更新数据库的同时,也尽量在 Python 中的对象上使用同样的操作
stmt = update(User).where(User.name == "john").values(name="John").execution_options(synchronize_session="fetch")
session.execute(stmt)

# 或者直接赋值
user.name = "John"
session.commit()

# 这里有一个可能引入 race condition(静态条件)的地方
user.visit_count += 1  # 错误!如果两个进程同时更新这个值,可能导致更新失败
# SQL:Update users set visit_count = 2 where user.id = 1
user.visit_count = User.visit_count + 1  # 注意大写的 U,也就是使用了模型的属性,生成的 SQL 是在 SQL 中 +1
# SQL: Update users set visit_count = visit_count + 1 where user.id = 1

加载外键的值

# 默认情况下,在查询中是不会加载外键的,我们可以使用 joinedload 选项来加载外键,从而避免 N+1 问题
session.execute(select(Child)).scalars().all()  # 没有加载 parent 外键
# 使用 joinedload 加载外键,注意需要使用 unique 方法,这是 2.0 中规定的。
session.execute(select(Child).options(joinedload(Child.parent))).unique().scalars().all()

在 2.0 中,更推荐使用 selectinload 而不是 joinedload,一般情况下,selectinload 都要好,而且不用使用 unique.
从原理上来说,joinedload 是通过 join 的方式加载了对应的数据。而 selectinload 是通过 select * from parents where id in … 来实现的。

session.execute(select(Child).options(selectinload(Child.parent))).scalars().all()

外键的写入

SQLAlchemy 中,直接像处理数组一样处理外键就好了,这点非常方便。

parent.children.append(child1)  # 添加
parent.children.remove(child2)  # 删除
parent.children.clear()
parent.children = []  

JSON 字段的特殊处理

大多数的数据库现在都支持 JSON 字段了,在 SQLAlchemy 中我们也可以直接从字段读取 json 对象或者写入 json 对象。
但是,要记住,千万不要直接对这个 json 对象做 update 并期望写回数据库中,这个是不可靠的。一定要复制后读写,然后在赋值回去。

article = session.get(Article, 1)
tags = copy.copy(article.tags)
tags.append("iOS")
article.tags = tags
session.commit()

批量插入

当需要插入大量数据的时候,如果依然采用逐个插入的方法,那么就会在和数据库的交互上浪费很多时间,效率很低。
MySQL 等大多数数据库都提供了 insert ... values (...), (...) ... 这个批量插入的 API,在 SQLAlchemy 中也可以很好地利用这一点。

# 使用 session.bulk_save_objects(...) 直接插入多个对象

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

# 使用 bulk_insert_mappings 可以省去创建对象的开销,直接插入字典
users = [
    {"name": "u1"},
    {"name": "u2"},
    {"name": "u3"},
]
s.bulk_insert_mappings(User, users)
s.commit()

# 使用 bulk_update_mappings 可以批量更新对象,字典中的 id 会被用作 where 条件,其他字段全部用于更新
session.bulk_update_mappings(User, users)

从 1.X API 迁移到 2.0 API

# session.query(User).get(42)
session.get(User, 42)

# session.query(User).all()
session.execute(select(User)).scalars().all()

# session.query(User).filter_by(name="some_user").one()
session.execute(select(User).filter_by(name="some_user")).scalar_one()

# session.query(User).from_statememt(text("select * from users")).a..()
session.execute(select(User).from_statement(text("selct * from users"))).scalars().all()

# session.query(User).filter(User.name == "foo").update({"fullname": "FooBar"}, synchronize_session="evaluate")
session.execute(update(User).where(User.name == "foo").values(fullname="FooBar").execute_options(synchronize_session="evaluate"))

反射——从数据库创建模型

在 FastAPI 中使用

参考

  1. https://docs.sqlalchemy.org/en/14/tutorial/ormrelatedobjects.html
  2. https://stackoverflow.com/questions/25668092/flask-sqlalchemy-many-to-many-insert-data
  3. https://stackoverflow.com/questions/9667138/how-to-update-sqlalchemy-row-entry
  4. https://docs.sqlalchemy.org/en/14/changelog/migration_20.html
  5. https://stackoverflow.com/questions/13370317/sqlalchemy-default-datetime
  6. https://amercader.net/blog/beware-of-json-fields-in-sqlalchemy/
  7. https://stackoverflow.com/questions/26948397/how-to-delete-records-from-many-to-many-secondary-table-in-sqlalchemy
  8. Count by many to many foreign key
  9. https://stackoverflow.com/questions/19175311/how-to-create-only-one-table-with-sqlalchemy/19175907
  10. https://stackoverflow.com/questions/63220132/sqlalchemy-insert-to-mysql-db-unicodeencodeerror-for-cyrlic-data

写一个 CRUD 还挺难的

让我们只从后端角度出发,考虑写一个简单的博客系统会有哪些问题。这篇文章谈论的并不是某个 Web 框架的 TODO list demo 之类的东西,那都是玩具性质的,而是会谈一谈生产环境中的要考虑的一些实际问题。本文中,我们也不会涉及到像是 MySQL 的几种隔离模式或者是 Kafka 是不是 Exactly Once 这种后端面试常问的八股文,而是从全局考虑一些简单但是又避不开的繁琐问题。

数据库

首先,需要定义一下数据库的表吧。在博客中,我们至少需要两个表:

  1. articles
  2. users

其中 articles 表中应该有一个 author_id 字段关联到 users 表中。那么问题来了,要不要在数据库层面定义物理外键呢?还是仅仅从逻辑上把两者关联起来。

使用物理外键的好处是可以少处理很多异常情况,因为数据库层面已经帮你解决了。但是在开发阶段,当需要删除数据的时候会有很繁琐的依赖问题。在生产环境中,外键也可能带来一些写入的性能问题。

如果只使用逻辑外键呢?那么就需要经常考虑有非法外键的情况,代码中要做好处理,不然就异常满天飞了。

关于数据库的第二个问题,要不要使用 ORM 呢?或许你已经习惯了使用 SQLAlchemy 或是 Hibernate 这类 ORM,并且认为这就是最佳实践。实际上,ORM 并不一定是最好的选择,选择 ORM 可能会有三个缺点:

  1. 意味着你在 SQL 语法之外还要另外学习一门 ORM 中包含的 DSL;
  2. ORM 也不能覆盖所有的 SQL 语句,很多时候你还是得手写 SQL;
  3. ORM 生成的 SQL 经常性能有问题,比如说经常就不小心 N+1 问题或者 select * 了。

甚至于,有的人认为 ORM 技术就是一团泥潭,堪比计算机界的越南战争(美国视角)

当然,不用 ORM 问题也很多,手工拼接 SQL 非常恶心不说,还容易出现 SQL 注入攻击的漏洞。

安全漏洞

正如刚刚提到的 SQL 注入攻击一样,作为一个生产环境的应用,面向大众开放,自然要考虑一些恶意攻击者的访问。

举个例子,你为了方便前端调用,在 API 中留下了一个 order_by=xxx 的参数,为了开发更灵活,这个参数直接对应到了数据库的字段而没有过滤。正常情况下,在你的客户端或者前端代码中,只是简单调用了一下 order_by=create_time,而这个字段是加了索引的,那么皆大欢喜。但是,恶意攻击者可不是这么想的,『我干嘛不调用一下没有索引的字段呢?』。比如说,发送大量的攻击请求到 order_by=first_name 上,那么很快你的数据库就可能被慢查询拖垮了。

这个例子蕴含了一个普遍的道理:灵活和安全不可得兼,你必须针对你的应用,选择一个恰当的地方做好折中。

上面的例子还没有涉及到数据,只是把网站搞挂了而已。另一个容易产生漏洞的地方,就是权限管理了。

假设你有一个用户更新自己文章的 API。那么就应该在其中校验用户传过来的 article_id 是不是他自己的文章。前端校验是不够的,假如恶意用户构造一个请求呢?如果不校验,就可能导致任意修改他人文章的漏洞。再比如,文章状态可能包含草稿和已发布,那么应该只有已发布的文章才可以浏览,但是用户也应该能预览和编辑自己未发布的草稿,你都需要判断是否有适当的权限。问题还可以变得更复杂一点,用户可能分为普通用户和管理员用户,管理员用户又可以查看所有的文章。在稍微大型一点的应用中,文章可能有不同的属性分类,用户更是可能有不同的角色,比如编辑、审核、管理员等等,文章的展现形式也可能多种多样,比如说是完全不可见,还是仅列表隐藏,还是仅列表可见标题,实际访问才会提示不可见/付费内容呢?

后台系统

前面提到了管理员用户,那么就引入了又一个问题——后台管理系统。对于一个博客系统的普通浏览者,看到的可能就是一篇篇的博客,但是对于博客的作者或者管理员来说,一定还有一个后台管理系统来写博客。后台管理系统需要有权限控制,对普通作者,需要限制访问后台的权限,比如说只能访问写作模块,对于管理员,则可以访问用户管理模块。一般来说,后台系统是一个复杂度不亚于前台的部分,这工作量一下就 double 了。现实中的应用,比如说新浪博客,可能还要复杂一些,至少分为三个部分。普通用户看到的博客页面,这部分在水面上,大家都能看到;博主撰写文章,管理自己文章的个人后台;而在新浪博客内部,一定还有一个审核文章,管理用户的内部后台,这工作量一下子就 triple 了。

你可能会说,django admin 这种自动生成的后台系统它不香么?等你尝试着添加一些 JS/CSS 或者是更改一下权限系统就知道了。有人专程写文章批判过:CRUD 代码生成的反模式典范:django admin

性能问题

等你把一切业务功能性的开发都搞定了,是时候考虑性能了。还是以博客系统为例,假如你有一个点赞的功能,那么根据学校里面教的数据库范式,这是一个典型的多对多关系啊,应该弄一个关联表,大概像这样:

users(n) <-> user_liked_articles <-> articles(n)

问题来了,如果首页要展现一个用户最喜欢的文章列表怎么办?在首页上有一个三个表的 join 操作对性能的影响是可想而知的。这时候至少有两种思路:

  1. 放弃正交化,在 article 表中增加一个 num_likes 字段,这样直接只查一个表就出来结果啦。缺点是要在代码中做好冗余信息的同步。
  2. 增加一个缓存,可以缓存整个首页,也可以缓存这条 SQL 的查询结果。缺点是你又得多一个组件,而且查询结果是有延迟的。

这已经是简单到不能再简单的一个小小性能问题了,真正的性能瓶颈可能需要通过使用 profile 工具或者是 wrk 这类的压测工具才能找出并修复。

部署

当你把性能问题也解决差不多的时候,终于到部署了。相对来说,部署还算中规中矩,没有多少幺蛾子。但是至少也要涉及到配置 nginx,申请 SSL 证书这些方面。

在 2021 年的今天,肯定得上 docker 吧,那还得写 dockerfile。如果进一步,要用 k8s 的话,还得写 deployments。或许你还需要配置一下 Jenkins 或者其他 CI 工具……

另外,静态文件怎么放也得考虑一下,就不说 CDN 或者优化图片大小了。至少开发时候你存放的目录得挪到 nginx 对应的目录吧?都是不疼不痒但是必须考虑的杂活儿。

结语

大概写到这里吧,还有好多问题没有涉及,包括但不限于:

  1. 搜索。总不能用 select * like %keyword% 吧?如果上 ES 的话,ES 的查询语法大概也得了解吧,还得考虑到配置从 SQL 数据库到 ES 的同步问题。
  2. 监控和日志。从两个方面来说,一是程序的性能监控,服务挂了得即使知道。另一方面是业务数据的监控,比如说 DAU 是多少。
  3. 测试。代码的单元测试,集成测试等等。
  4. 异步消息。比如说刚刚提到的点赞,被点赞的用户要不要收到一个通知呢?是否要发送邮件通知?邮件通知是不是该搞个消息队列异步操作?redis/kafka?又引入了一个新系统。
  5. 反爬。前面提到的恶意攻击都是以破坏数据为目的的。爬虫稍微好点,只是想把你的数据(全部)搞走。或许你会愿意搜索引擎赶紧收录你的博客文章,但是肯定不希望一个竞争对手把你的所有用户信息全都爬走,然后挨个邀请过去。举两个在反爬上很简单的错误:
    1. 暴露数据库的物理 id。假如你的数据库直接用的自增 id,并且把这个 id 暴露在 API 中,那好了,我直接遍历就完了,所有用户信息拿到。
    2. 没有频控。最起码也要针对 IP 做一个漏桶或者令牌桶的频控吧,不然爬虫流量消耗服务器资源都是很大的问题。

看到这里,希望你对一个系统的复杂和琐碎性能有一个大体的印象,就不要再问出

  1. 10 万块钱能不能做个淘宝?
  2. 4 个月能不能山寨个抖音?
  3. 不就是加个字段么,为啥还要排期?

这种奇葩问题了。总体来说,以上没有什么技术难点。但是每一个点都需要做出取舍折中,特别琐碎。要把每一个小事都考虑好了,还挺复杂的,工作量也不小,而且也不一定是一个人能搞定的。以上所有的这些还都只是后端的问题,另一半前端的问题还完全没有考虑……

对于前端,使用 React 还是 Vue。考虑到 SEO 的话,哪些页面还需要做静态化,这些都是需要考虑的众多问题之一……

最后,或许你还是看不起一个简单的 CRUD 的 web 应用。什么大数据啊、深度学习啊、高并发才是应该掌握的知识嘛!但是,要知道互联网的根基或者说起点从来都是一个简单的后端+前端。你首先做出一个有用的东西,有了用户,慢慢才会产生大数据,才会有高并发的需求。

本文像之前写的「爬虫思路」那篇文章一样,纯意识流瞎写,没有什么架构图,也没有任何参考文章。或许过几个月,我还会写一篇关于前端或者机器学习的文章吧。

放弃 requests,拥抱 httpx

httpx 是新一代的 Python http 请求库,它几乎和 requests 的 API 无缝兼容,几乎不用改代码。相对于 requests 来说,有以下优点:

  • 支持 asyncio, 可以直接 async/await 啦
  • 支持 http 2, requests 一直都不支持
  • 实现了正确的 http 代理
  • Cookie 的 API 更友好
  • 有自己的网络连接池,requests 是基于第三方的 urllib3
  • 文档更有条理,更深入

还有就是 requests 有比较明显的内存泄漏,目前我还没有测试过 httpx,所以就不列到优点里了。

httpx 不支持在 client.request 中使用 proxies,必须在 Client 初始化时候指定,这样做应该是考虑到链接池的实现。

Axios 的基本使用

fetch 虽然在现代浏览器中已经支持很好了,但是 API 还是没有那么好用。比如说增加 GET 参数必须使用 URLSearchParm 这个类。相比之下,Axios 的 API 还是更接近直觉一些。

import axios from 'axios'

axios(url, [config]);
axios(config);
axios.get(url, [config])
axios.delete(url, [config])
axios.put(url, data, [config])
axios.post(url, data, [config])

// 请求中 config 的属性
const config = {
    headers: {}, // Headers
    params: {},  // GET 参数
    data: {},  // 默认情况下 {} 会按照 JSON 发送
    timeout: 0, // 默认是没有 timeout 的
}

// 如果要发送传统的 POST 表单
const config = {
    data: new FormData()
}

// headers 会根据 data 是 json 还是表单自动设置,非常贴心。

// 响应的属性
let res = await axios.get("api")
res = {
    data: {},  // axios 会自动 JSON.parse
    status: 200,
    statusText: "OK",
    headers: {},
    config: {},
    request: {}
}
// 和 fetch 不同的是,res.data 可以直接使用,而 fetch 还需要 await res.json()

// 如果要添加一些默认的设置,使用
axios.defaults.headers.common["X-HEADER"] = "value"
axios.defaults.timeout = 3000
// 具体参见这里: https://github.com/axios/axios/blob/master/lib/defaults.js#L28

重定向与其他错误状态码

对于 3XX 重定向代码,axios 只能跟中,这是浏览器决定的,不是 axios 可以改变的。
对于 4XX 和 5XX 状态码,axios 会抛出异常,可以 catch 住。

上传文件

如果一个表单中包含的文件字段,那么传统的方法是把这个字段做为表单的一部分上传。然而现代的 API 大多是 json 接口,并没有 POST 表单这种格式。
这时候可以单独把上传作为一个接口,通过 POST 表单的方式上传,然后返回上传后的路径。在对应的 API 的接口中附件的字段填上这个路径就好了。

参考

  1. https://stackoverflow.com/questions/4083702/posting-a-file-and-associated-data-to-a-restful-webservice-preferably-as-json

Boto3 访问 S3 的基本用法

以前我以为文档坑爹只有一种方式,那就是没有文档。最近在用 boto3, 才让我认识到了文档的另一种坑爹方式:屁话太多。

具体来说是这样的:Boto3 中有两种 API, 低级和高级。其中低级 API 是和 AWS 的 HTTP 接口一一对应的,
通过 boto3.client(“xxx”) 暴露。高级接口是面向对象的,更加易于使用,通过 boto3.resource(“xxx”) 暴露,
美中不足是不一定覆盖了所有的 API.

坑爹的 AWS 文档中,经常混用 resource 和 client 两套接口,也没有任何提示,文档的首页除了简单的提了一句有两套 API 外再没有单独的介绍了。
在没写这篇文章之前,我的脑子里都是乱的,总觉得 S3(Simple Storage Service) 的这个狗屁接口哪里配得上 Simple 这个词,
一会儿是 listobject, 一会儿是 listobject_v2 的。相反,高级 API 是很简单易用的,然而这样一个简单的 API 被深深地埋在了一大堆的低级 API 中,
网上的文章也是一会儿 boto3.client, 一会儿 boto3.resource. 除了有人特意提问两者的区别,很难看到有人说这俩到底是啥。

吐槽完毕。

最近总是用到 S3, 在这里记录一下 Boto3 的简单用法了。Boto3 是整个 AWS 的 SDK, 而不只是包括 S3. 还可以用来访问 SQS, EC2 等等。

如果没有特殊需求的话,建议使用高级 API. 本文一下就记录一些 boto3.resource("s3") 的例子。

import boto3

import boto3

session = boto3.Session(
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    region_name=REGION_NAME,
)

# s3 = boto3.resource("s3")
s3 = session.resouce("s3")

# 创建一个 bucket
bucket = s3.create_bucket(Bucket="my-bucket")

# 获得所有的 bucket, boto 会自动处理 API 的翻页等信息。
for bucket in s3.buckets.all():
    print(bucket.name)

# 过滤 bucket, 同样返回一个 bucket_iterator
s3.buckets.fitler()

# 生成一个 Bucket 资源对象
bucket = s3.Bucket("my-bucket")
bucket.name  # bucket 的名字
bucket.delete()  # 删除 bucket

# 删除一些对象
bucket.delete_objects(
    Delete={
        'Objects': [
            {
                'Key': 'string',
                'VersionId': 'string'
            },
        ],
        'Quiet': True|False
    },
)
# 返回结果
{
    'Deleted': [
        {
            'Key': 'string',
            'VersionId': 'string',
            'DeleteMarker': True|False,
            'DeleteMarkerVersionId': 'string'
        },
    ],
    'RequestCharged': 'requester',
    'Errors': [
        {
            'Key': 'string',
            'VersionId': 'string',
            'Code': 'string',
            'Message': 'string'
        },
    ]
}

# 下载文件
bucket.download_file(Key, Filename, ExtraArgs=None, Callback=None, Config=None)

# 下载到文件对象,可能会自动开启多线程下载
with open('filename', 'wb') as data:
    bucket.download_fileobj('mykey', data)

# 上传文件
object = bucket.put_object(Body=b"data"|file, ContentMD5="", Key="xxx")

# 这个方法会自动开启多线程上传
with open('filename', 'rb') as f:
    bucket.upload_fileobj(f, 'mykey')

# 列出所有对象
bucket.objects.all()

# 过滤并返回对象
objects = bucket.objects.filter(
    Delimiter='string',
    EncodingType='url',
    Marker='string',
    MaxKeys=123,
    Prefix='string',
    RequestPayer='requester',
    ExpectedBucketOwner='string'
)

# 创建一个对象
obj = bucket.Object("xxx")
# 或者
obj = s3.Object("my-bucket", "key")

obj.bucket_name
obj.key

# 删除对象
obj.delete()
# 下载对象
obj.download_file(path)
# 自动多线程下载
with open('filename', 'wb') as data:
    obj.download_fileobj(data)
# 获取文件内容
rsp = obj.get()
body = rsp["Body"].read()  # 文件内容
obj.put(Body=b"xxx"|file, ContentMD5="")

# 上传文件
obj.upload_file(filename)
# 自动多线程上传
obj.upload_fileobj(fileobj)

如果想进一步了解,建议直接点开参考文献 2, 阅读下 resouce 相关接口的文档,其他的低级 client 接口完全可以不看。

参考

  1. https://stackoverflow.com/questions/42809096/difference-in-boto3-between-resource-client-and-session
  2. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#service-resource

对比 Flask,学习 FastAPI

本文假定你对 Flask 等 web framework 已经比较熟悉。不过多解释为什么要这么做,直接上干货。FastAPI 的一些优势大概有:类型检查、自动 swagger 文档、支持 asyncio、强大的依赖注入系统,我在 这个回答 里具体写过,就不展开了。

安装

注意一定要加上这个 [all], 免得后续缺什么依赖。

pip install fastapi[all]

Hello, World

# main.py
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def hello():
    return "Hello, World!"

在命令行开启调试服务器:

uvicorn main:app --reload --port 8000

然后访问 http://localhost:8000 就可以看到 hello world 啦。更妙的是,访问 http://localhost:8000/docs 就可以看到自动生成,可以交互的 Swagger UI 文档了。

注意在上面的装饰器中,直接使用了 app.get 来定义 GET 访问的路径,其他的 post/put/delete 也都同理。Flask 在 1.x 中还必须使用 app.route,在 2.0 中才赶了上来。

同步和异步

# 同步函数
@app.get("/sync")
def hello():
    return "Hello, World!"

# 异步函数
@app.get("/async")
async def hello2():
    return "Hello, World!"

FastAPI 原生支持同时同步和异步函数,而且是完全一样的用法。使用异步函数肯定性能最好,但是有好多库,尤其是访问数据库的库用不了。使用同步函数的话,也不会把整个 Event Loop 锁死,而是会在线程池中执行。所以我个人的建议是,如果你已经习惯了写 Async Python,那么完全可以全部都写异步函数。如果你刚刚开始接触 asyncio 这一套,那么完全可以大部分都写同步函数,通过 profile 找到调用最多,或者对性能影响最大的地方,把这部分切换到异步函数。

请求参数

废话不多说,看请求参数都有哪些。

路径参数

@app.get("/{name})
def hello(name: str):
    return "Hello " + name

访问 http://127.0.0.1:8000/docs 就可以看到 OpenAPI 的文档了。

如果在文档中需要使用 Enum 的话,那么创建一个 Enum 的子类:

from enum import Enum

class ModelName(str, Enum):
   alexnet = "alexnet"
   resnet = "resnet"

@app.get("/models/{model_name}")
def get_model(model_name: ModelName):
    ...

值得注意的一点是,对于结尾的 “/”, FastAPI 不会强制要求匹配。如果你的请求和定义的不一样,那么 FastAPI 会 307 重定向到定义的那一个。比如定义了 app.get("/items/"), 而请求是 requests.get("/items"), 那么会 307 到 /items/. 但是如果部署在代理后面,重定向的地址可能不对,这个可能会引起问题,建议还是前后端统一一下。

另一个需要注意的地方,顺序是很重要的,FastAPI 按照定义的顺序来匹配路由。比如说你定义了两个路径 /users/me/users/{id}, 那么,一定要把 /users/me 放在前边,否则就会匹配到 users/{id}.

url query 参数

GET 参数是 FastAPI 和 Flask 不一样的地方。FastAPI 中统一用函数参数获取,这样也是为了更好地实现类型检查:

fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]

# GET /items/?skip=100&limit=10
@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
    return fake_items_db[skip : skip + limit]

对于 bool 类型,URL 中的 1/true/True/on/yes 都会被转化为 True, 其他都是 False.

http body 参数

POST 参数统一使用 json, 需要使用 PyDantic 的 BaseModel 来定义,只要参数是 BaseModel 类型的,FastAPI 就会从 body 中获取:

from pydantic import BaseModel

class Item(BaseModel):
    name: str
    description: Optional[str] = None
    price: float
    tax: Optional[float] = None

@app.post("/items/")
async def create_item(item: Item):
    return item

@app.put("/items/{item_id}")
async def create_item(item_id: int, item: Item):
    return {"item_id": item_id, **item.dict()}

如果不想定义 Model 验证,只想直接读取请求,那么可以使用 Request 对象,不过 Request 对象只有 async 版本,要想在同步函数中使用,需要一点点 work-around,这算是一个小小的坑吧:

from fastapi import Request, Depends

async def get_json(req: Request):
    return await req.json()

@app.get("/", response_model=None)
def get(req=Depends(get_json)):
    print(req)

其中用得到的 Depends 是 FastAPI 中非常强大的依赖注入系统,后面有时间了再详细讲。

Request 的文档详见:https://www.starlette.io/requests/, 参考:https://github.com/tiangolo/fastapi/issues/2574

Cookie 参数

使用 Cookie 来定义一个从 Cookie 中读取的参数:

from fastapi import Cookie, FastAPI

@app.get("/items")
def read_items(ads_id: Optional[str] = Cookie(None)):
    return {"ads_id": ads_id}

这里有个坑爹的地方,限于浏览器的安全模型,在生成的 Swagger UI 中 Cookie 参数是不能使用的。这是上游 Swagger/OpenAPI 项目的 bug(https://github.com/tiangolo/fastapi/issues/880), 所以在 FastAPI 这里也无解。

一个简单的 work-around 是:如果你有生成 Cookie 的 API, 那么先调用一下生成 Cookie 的 API, 再调用需要 Cookie 认证的 API 的时候就会自动附上这个 Cookie.

Header 参数

和 Cookie 是类似的。FastAPI 会自动把 “User-Agent” 转化成 user_agent 的形式。

from fastapi import FastAPI, Header

@app.get("/items/")
async def read_items(user_agent: Optional[str] = Header(None)):
    return {"User-Agent": user_agent}

参数验证

如果需要对参数进行验证,需要使用 Query, Path 等对象来指定,分别对应 GET 参数和路径:

from fastapi import FastAPI, Query, Path

@app.get("/items")
def read_items(q: Optional[str] = Query(None, min_length=3, max_length=50, regex="^fixedquery$"))
    ...

# 如果不使用默认值的话,可以使用特殊对象 `...`
def read_items(q: Optional[str] = Query(..., max_length=50))
# 数字可以使用 lt/le 或者 gt/ge 定义
def read_items(item_id: int = Path(..., gt=0, lt=100))

响应参数

需要在 app.get 中的 response_model 参数指定响应的 BaseModel:

from pydantic import BaseModel

class Item(BaseModel):
    name: str
    description: Optional[str] = None
    price: float
    tax: Optional[float] = None
    tags: List[str] = []

@app.post("/items/", response_model=Item)
async def create_item(item: Item):
    return item

请求的 body 和响应的 body 经常公用一个 Model, 有些敏感信息可能不适合在响应的 body 中返回,这时候可以使用 response_model_includeresponse_model_exclude 来指定需要包含或者需要过滤出去的字段。

@app.get(
    "/items/{item_id}/name",
    response_model=Item,
    response_model_include=["name", "description"],
    response_model_exclued=["password"]
)
async def read_item_name(item_id: str):
    return items[item_id]

使用 status_code 参数可以更改返回的状态码:

@app.get("/", status_code=201)
def get():
    ...

更新 Cookie

使用 Reponse 对象的 set_cookiedelete_cookie 方法实现增删 Cookie. 注意的是,只需要对 response 对象进行操作,不需要 return response 对象。

from fastapi import FastAPI, Response

@app.post("/cookie-and-object/")
def create_cookie(response: Response):
    response.set_cookie(key="fakesession", value="fake-cookie-session-value")
    response.delete_cookie(key)
    return {"message": "Come to the dark side, we have cookies"}

产生错误

直接 raise HTTPException 就好了

from fastapi import HTTPException

@app.get("/")
def hello():
    raise HTTPException(status_code=404, detail="item not found")

使用路由模块化

FastAPI 中的 router 相当于 Flask 中的 Blueprint, 用来切分应用到不同的模块。

# views/users.py
from fastapi import APIRouter

router = APIRouter()

@router.get("/me")
def myinfo():
    ...

# main.py
from fastapi import FastAPI
from views import users

app = FastAPI()

app.include_router(users.router, prefix="/users")

就到这里吧,简单写了下基本的请求响应和 Router 的使用,这是这个系列的第一篇,敬请期待后续。最后说一句,FastAPI 的官方文档非常详尽,值得一看。不过对于已经有 web 开发知识的人来说稍显啰嗦。

访问日志

FastAPI 默认的日志似乎有一些问题,不会显示出访问日志来

参考

FastAPI 官方文档

Playwright: 比 Puppeteer 更好用的浏览器自动化工具

在 Playwright 之前,我一般会使用 Selenium 或者 Puppeteer 来进行浏览器自动化操作。然而,Selenium 经常会有一些奇怪的 bug, Puppeteer 则是没有官方 Python 版,非官方版本也只有 async 版本,并且也是有一些奇怪的 bug. 另外,众所周知,Python 的 Async API 并不是那么好用。

Playwright 是微软出品的浏览器自动化工具,大厂开源作品,代码质量应该是有足够保证的。而且它还官方支持同步版的 Python API, 同时支持三大浏览器,所以赶紧切换过来了。

特别注意 Playwright 的拼写,别把中间的 “w” 丢了。

安装

pip install playwright==1.8.0a1  # 很奇怪,必须指定版本,不指定会安装到一个古老的版本
python -m playwright install  # 安装浏览器,此处国内网络可能会有问题(你懂的),请自行解决

基本使用

Playwright 支持 Firefox/Chrome/WebKit(Safari). 其中 webkit 最轻量了,所以没有什么特殊需求最好使用 webkit, 不要使用 chromium.

from playwright.sync_api import sync_playwright as playwright

with playwright() as pw:
    webkit = pw.webkit.launch(headless=False)
    context = webkit.new_context(
        extra_http_headers={},
        ignore_https_erros=True,
        proxy={"server": "http://proxy.com"},
        user_agent="Mozilla/5.0...",
        viewport={"height": 1280, "width": 800}
    )  # 需要创建一个 context
    page = context.new_page()  # 创建一个新的页面
    page.goto("https://www.apple.com")
    print(page.content())
    webkit.close()

Playwright 官方推荐使用 with 语句来访问,不过如果你不喜欢的话,也可以用 pw.start() 和 pw.stop().

pw = playwright().start()
pw.webkit.launch()

Context 和 Page 没有提供 with 语句来确保关闭,可以使用 with closing 来实现,或者直接用 try…finally

from contextlib import closing

with closing(webkit.new_context()) as context:
    page = context.new_page()

context = webkit.new_context()
try:
    ...
finally:
    context.close()

新概念:Context

和 Puppeteer 不同的是,Playwright 新增了 context 的概念,每个 context 就像是一个独立的匿名模式会话,非常轻量,但是又完全隔离。比如说,可以在两个 context 中登录两个不同的账号,也可以在两个 context 中使用不同的代理。

通过 context 还可以设置 viewport, user_agent 等。

context = browser.new_context(
  user_agent='My user agent'
)
context = browser.new_context(
  viewport={ 'width': 1280, 'height': 1024 }
)
context = browser.new_context(
    http_credentials={"username": "bill", "password": "pa55w0rd"}
)

# new_context 其他几个比较有用的选项:
ignore_https_errors=False
proxy={"server": "http://example.com:3128", "bypass": ".example.com", "username": "", "password": ""}
extra_http_headers={"X-Header": ""}

context 中有一个很有用的函数context.add_init_script, 可以让我们设定在调用 context.new_page 的时候在页面中执行的脚本。

# hook 新建页面中的 Math.random 函数,总是返回 42
context.add_init_script(script="Math.random = () => 42;")
# 或者写在一个文件里
context.add_init_script(path="preload.js")

还可以使用 context.expose_bindingcontext.expose_function 来把 Python 函数暴露到页面中,不过个人感觉还是使用 addinitscript 暴露 JS 函数方便一些。

和 Puppeteer 一样,Playwright 的核心概念依然是 page, 核心 API 几乎都是 page 对象的方法。可以通过 context 来创建 page.

页面基本操作

按照官网文档,调用 page.goto(url) 后页面加载过程:

  1. 设定 url
  2. 通过网络加载解析页面
  3. 触发 page.on(“domcontentloaded”) 事件
  4. 执行页面的 js 脚本,加载静态资源
  5. 触发 page.on(“laod”) 事件
  6. 页面执行动态加载的脚本
  7. 当 500ms 都没有新的网络请求的时候,触发 networkidle 事件

page.goto(url) 会跳转到一个新的链接。默认情况下 Playwright 会等待到 load 状态。如果我们不关心加载的 CSS 图片等信息,可以改为等待到 domcontentloaded 状态,如果页面是 ajax 加载,那么我们需要等待到 networkidle 状态。如果 networkidle 也不合适的话,可以采用 page.waitforselector 等待某个元素出现。不过对于 click 等操作会自动等待。

page.goto(url, referer="", timeout=30, wait_until="domcontentloaded|load|networkidle")

Playwright 会自动等待元素处于可操作的稳定状态。当然也可以用 page.wait_for_* 函数来手工等待:

page.wait_for_event("event", event_predict, timeout)
page.wait_for_function(js_function)
page.wait_for_load_state(state="domcontentloaded|load|networkidle", timeout)
page.wait_for_selector(selector, timeout)
page.wait_for_timeout(timeout)  # 不推荐使用

对页面的操作方法主要有:

# selector 指的是 CSS 等表达式
page.click(selector)
page.fill(selector, value)  # 在 input 中填充值

# 例子
page.click("#search")

# 模拟按键
page.keyboard.press("Z")
# 支持的按键名字
# F1 - F12, Digit0- Digit9, KeyA- KeyZ, Backquote, Minus, Equal, Backslash, Backspace, Tab, Delete, Escape, ArrowDown, End, Enter, Home, Insert, PageDown, PageUp, ArrowRight, ArrowUp
# 如果是单个字母,那么是大小写敏感的,比如 a 和 A 发送的是不同的按键
# 组合按键
page.keyboard.press("Shift+A")  # Shift, Ctrl, Meta, Alt, 其中 Meta 应该是相当于 Win/Cmd 键
# 其他的按键参考这里:https://playwright.dev/python/docs/api/class-keyboard

# 模拟输入
page.keyboard.type("hello")

在编写爬虫的过程中,向下滚动触发数据加载是个很常见的操作。但是如果想要向下滚动页面的话,似乎现在没有特别好的方法。本来我以为会有 page.scroll 这种方法的,然而并没有。在没有官方支持之前,暂时可以使用 page.keyboard.press("PageDown") 来向下滚动。

获取页面中的数据的主要方法有:

page.url  # url
page.title()  # title
page.content()  # 获取页面全文
page.inner_text(selector)  # element.inner_text()
page.inner_html(selector)
page.text_content(selector)
page.get_attribute(selector, attr)

# eval_on_selector 用于获取 DOM 中的值
page.eval_on_selector(selector, js_expression)
# 比如:
search_value = page.eval_on_selector("#search", "el => el.value")

# evaluate 用于获取页面中 JS 中的数据,比如说可以读取 window 中的值
result = page.evaluate("([x, y]) => Promise.resolve(x * y)", [7, 8])
print(result) # prints "56"

选择器表达式

在上面的代码中,我们使用了 CSS 表达式(比如#button)来选取元素。实际上,Playwright 还支持 XPath 和自己定义的两种简单表达式,并且是自动识别的。

自动识别实际上是有一点点坑的,凡是 . 开头的都会被认为是 CSS 表达式,比如说 .close, .pull-right 等等,但是 .//a 这种也会被认为是 CSS 导致选择失败,所以使用 XPath 的时候就不要用 . 开头了。

# 通过文本选择元素,这是 Playwright 自定义的一种表达式
page.click("text=login")

# 直接通过 id 选择
page.click("id=login")

# 通过 CSS 选择元素
page.click("#search")
# 除了常用的 CSS 表达式外,Playwright 还支持了几个新的伪类
# :has 表示包含某个元素的元素
page.click("article:has(div.prome)")
# :is 用来对自身做断言
page.click("button:is(:text('sign in'), :text('log in'))")
# :text 表示包含某个文本的元素
page.click("button:text('Sign in')")  # 包含
page.click("button:text-is('Sign is')")  # 严格匹配
page.click("button:text-matches('\w+')")  # 正则
# 还可以根据方位匹配
page.click("button:right-of(#search)")  # 右边
page.click("button:left-of(#search)")  # 左边
page.click("button:above(#search)")  # 上边
page.click("button:below(#search)")  # 下边
page.click("button:near(#search)")  # 50px 之内的元素

# 通过 XPath 选择
page.click("//button[@id='search'])")
# 所有 // 或者 .. 开头的表达式都会默认为 XPath 表达式

对于 CSS 表达式,还可以添加前缀css=来显式指定,比如说 css=.login 就相当于 .login.

除了上面介绍的四种表达式以外,Playwright 还支持使用 >> 组合表达式,也就是混合使用四种表达式。

page.click('css=nav >> text=Login')

复用 Cookies 等认证信息

在 Puppeteer 中,复用 Cookies 也是一个老大难问题了。这个是 Playwright 特别方便的一点,他可以直接导出 Cookies 和 LocalStorage, 然后在新的 Context 中使用。

# 保存状态
import json
storage = context.storage_state()
with open("state.json", "w") as f:
    f.write(json.dumps(storage))

# 加载状态
with open("state.json") as f:
    storage_state = json.loads(f.read())
context = browser.new_context(storage_state=storage_state)

监听事件

通过 page.on(event, fn) 可以来注册对应事件的处理函数:

def log_request(intercepted_request):
    print("a request was made:", intercepted_request.url)
page.on("request", log_request)
# sometime later...
page.remove_listener("request", log_request)

其中比较重要的就是 request 和 response 两个事件

拦截更改网络请求

可以通过 page.on(“request”) 和 page.on(“response”) 来监听请求和响应事件。

from playwright.sync_api import sync_playwright as playwright

def run(pw):
    browser = pw.webkit.launch()
    page = browser.new_page()
    # Subscribe to "request" and "response" events.
    page.on("request", lambda request: print(">>", request.method, request.url))
    page.on("response", lambda response: print("<<", response.status, response.url))
    page.goto("https://example.com")
    browser.close()

with playwright() as pw:
    run(pw)

其中 request 和 response 的属性和方法,可以查阅文档:https://playwright.dev/python/docs/api/class-request

通过 context.route, 还可以伪造修改拦截请求等。比如说,拦截所有的图片请求以减少带宽占用:

context = browser.new_context()
page = context.new_page()
# route 的参数默认是通配符,也可以传递编译好的正则表达式对象

# 1
context.route("**/*.{png,jpg,jpeg}", lambda route: route.abort())
# 2
context.route(re.compile(r"(\.png$)|(\.jpg$)"), lambda route: route.abort())
# 3
def no_static(route, req):
    if req.resource_type in {"image", "stylesheet", "media", "font"}:
        route.abort()
    else:
        route.continue_()
context.route("**/*", no_static)
page.goto("https://example.com")

其中 route 对象的相关属性和方法,可以查阅文档:https://playwright.dev/python/docs/api/class-route

灵活设置代理

Playwright 还可以很方便地设置代理。Puppeteer 在打开浏览器之后就无法在更改代理了,对于爬虫类应用非常不友好,而 Playwright 可以通过 Context 设置代理,这样就非常轻量,不用为了切换代理而重启浏览器。

context = browser.new_context(
    proxy={"server": "http://example.com:3128", "bypass": ".example.com", "username": "", "password": ""}
)

杀手级功能:录制操作直接生成代码

Playwright 的命令行还内置了一个有趣的功能:可以通过录制你的点击操作,直接生成 Python 代码。

python -m playwright codegen http://example.com/

Playwright 还有很多命令行功能,比如生成截图等等,可以通过 python -m playwright -h 查看。

其他

除此之外,Playwright 还支持处理页面弹出的窗口,模拟键盘,模拟鼠标拖动(用于滑动验证码),下载文件等等各种功能,请查看官方文档吧,这里不赘述了。对于写爬虫来说,Playwright 的几个特性可以说是秒杀 Puppeteer/Pyppeteer:

  1. 官方同步版本的 API
  2. 方便导入导出 Cookies
  3. 轻量级设置和切换代理
  4. 支持丰富的选择表达式

快点用起来吧!

后记:实战采坑

使用浏览器类工具做爬虫的最大隐患就是内存爆了,所以一定要记得使用 with 语句来确保浏览器关掉了。

参考

  1. https://playwright.dev/python/docs/core-concepts
  2. https://www.checklyhq.com/learn/headless/request-interception/