计算机

一分钟学一个 Python 标准库之 Pathlib

相对于 os.path 来说,Pathlib 极大地简化了路径相关的操作。举个例子来说:

获取当前文件路径下的 default.yaml 文件

import os

os.path.join(os.path.dirname(__file__), "default.yaml")

只需要:

from pathlib import Path

Path(__file__).parent / "default.yaml"

不难看出,pathlib 有几个优点:

  1. 字符少了很多,出 bug 的地方就少了很多;
  2. 和操作系统一样,pathlib 直接使用路径分隔符 / 来操作,而不是 join 的两个参数;
  3. 非常符合直觉,从左到右阅读。而 os.path 使用函数参数,需要来回跳跃理解。相比之下,Pathlib 阅读起来很清晰。

Python3 标准库中所有接受 str 作为路径参数的地方,现在都可以接受一个 path 对象了。如果你使用的第三方库只接受 str 作为参数,
可以通过 path_str = str(path) 来转化一下。

Pathlib 的其他常用方法:

from pathlib import Path

cwd = Path.cwd()  # 获取当前目录
home = Path.home()  # 获取家目录,比如 /home/ubuntu
path = Path("/home/yifei")  # 创建一个新的 path 对象

path.is_dir()  # 是否是目录
path.is_file()  # 是否是普通文件
path.exists()  # 路径是否存在
path.resolve()  # 解析成绝对路径,比如 Path(".").resolve() 相当于 Path.cwd()

path.mkdir(parents=True, exists_ok=True) # 类似 mkdir -p

# 遍历目录
for child in path.iterdir():
    print(child)

# 除此之外,还有几个很甜的方法,省去了 with open 语句
path = Path("/home/ubuntu/readme.txt")
text = path.read_text()
path.write_text(text)

path = Path("/home/ubuntu/image.png")
image = path.read_bytes()
path.write_bytes(image)

>>> path
PosixPath('/home/ubuntu/test.md')
>>> path.name
'test.md'
>>> path.stem
'test'
>>> path.suffix
'.md'
>>> path.parent
PosixPath('/home/ubuntu')
>>> path.parent.parent
PosixPath('/home')
>>> path.anchor
'/'

在这个注意力涣散的年代,想要经常写一些长篇大论的帖子实在太难了,可能一个月才能憋出一篇。
不如分享写简单但是有用的知识点,或许还能经常更新。

以上就是全部内容啦。

参考

  1. https://realpython.com/python-pathlib/
  2. https://docs.python.org/3/library/pathlib.html

爬虫数据存储的一些选择

爬虫是一种典型的「读少写多」的应用场景。而且爬虫产生的数据一般是作为离线数据,而不是在线数据。
也就是说这些数据主要是用于离线数据分析,而不是直接供线上用户查询使用。
即使线上需要使用爬虫数据,也会需要经过清洗处理过后再放到在线库。总之,不会直接供线上直接使用。

本文总结了一些经验和踩过坑,希望能对读者有帮助。

写入通用原则

  1. 要批量写,不要挨个写
  2. 要顺序写,不要随机写

如果我们开了一个多线程的爬虫,然后每个线程每爬到一条数据就调一下 db.insert(item) 插入数据,数据一多就是灾难性的。

首先,每个线程持有一个数据库链接对数据库的负载就产生了不小压力。其次,每条数据都去调用数据库,那么每次插入时间都得
加上数据库的往返时间,也就是 2RTT(round trip time)。再者,每次插入的都是不同的数据,可能在磁盘的不同位置,导致
磁盘的写入时间大部分都花在寻道上了,磁盘 IO 时间会大幅提升。最后,如果是 SQL 型的数据库,默认配置下,
可能还会有事务的影响。

正确的做法是——用队列。不同的线程都把数据先发到一个队列中,不管是你用的语言自带的内存里的 Queue,还是 redis list,
或者是 kafka,都可以。然后由另一个线程或者脚本读取这个队列,把数据整理之后,定期或者定量写入数据库。

这样做基本上解决了上面提到的每个问题。只有存储线程持有数据库链接,每一条数据不会在需要 2RTT,摊薄下来可能是 2RTT / 1000,
数据经过整理后,每次可以都插入统一中类型的数据,磁盘不需要总是在寻道。

当然,这种方案也会引入一些新的问题,需要注意解决:

  1. 处理流程变成了异步,不能实时在数据中看到最新的爬取结果
  2. 多了消息队列的环节,多了丢数据的可能
  3. 如果消息队列是内存性的,不要让消息队列爆了

以上这些问题都是使用 MQ 的常见问题了,这里不再展开。

数据库选型

大概有这些选择:

  1. CSV、JSON、SQLite 或者其他单机文件
  2. SQL 数据库
  3. MongoDB 等文档数据库
  4. HBase 等 Hadoop 生态圈存储
  5. S3 类型对象存储
  6. Kafka 等持久性消息队列

如果你只是简单地「单线程」爬几页数据分析用,那么存个 CSV 或者 JSON 就可以了。如果你开始上多线程,甚至多机了,
既要考虑写入的时候加锁,而且没法分布式写入,单文件存储就不太合适了。

规模再大一点可以考虑 MySQL。虽然 MySQL 是一个 OLTP 数据库,通常意义上来说更适用于线上数据库。但是对于数据量不大的爬虫来说,
比如说总数据量不会超过 100GiB,也已经足够用了。而且 MySQL 可以添加 unique 索引,一定程度上还能帮助解决数据去重的问题。
这里有几点需要注意:

  1. 使用 ORM 创建的表中包含了不少外键约束之类的东西,对于爬来的数据,中间插入的时候可能还不满足这个外键,最好把这个约束删除掉
  2. 2.

有些人喜欢用 MongoDB 来存储爬虫数据,他们给出的理由也很有吸引力——爬虫数据多是半结构化的,而且数据结构可能经常跟着源网站变,
用 MongoDB 这种 schemaless 的文档数据库再合适不过了。然而我个人非常不推荐用 MongoDB,原因如下:

  1. 我觉得定义好表结构不是一个缺点,反而是一个优点,这样能够在开发调试阶段就发现各种异常情况,保证程序稳定
  2. MySQL 的图形化客户端太多了,比如说 Navicat,Sequel Pro 等等。对于小公司来说,这个客户端就已经够用了,根本不需要开发什么单独的管理后台
    相反,MongoDB 基本没有什么特别好用的客户端
  3. MySQL 和 Postgres 也早就支持了 JSON 字段,实在不是特别规整的数据,存在 JSON 字段就行了
  4. 数据分析的第三方库,比如 pandas,对 SQL 的支持都是原生的,一个 read_sql 就把数据读出来了

数据再多一些,或者并发量再大一些的话,可能单独使用 MySQL 就不合适了。这时候你可以对 MySQL 做定期归档,比如说把添加时间在一个月以上的数据
都按日期写入到 Hive 或者 S3 中,然后删除掉 MySQL 中的数据。这样的做法,其实相当于隐式地把 MySQL 作为了一个消息队列,并起到了缓冲的作用。

再者,MySQL 这种毕竟是行式数据库,如果你的数据数值居多,也可以跳过 MySQL,考虑直接存储到列式数据库中。下游的 Spark,Flink 这些消费端可能更喜欢读取列式数据。

最后一种选项是直接使用 Kafka 这种持久化的消息队列作为存储。DDIA 这本书中提到一个有趣的观点:数据库是日志的积分,日志是数据库的导数。
从某种意义上来说,两者所含有的信息是等价的,可以相互转换。所以直接使用消息队列作为数据存储也未尝不可。

总之,对于爬虫这种场景来说,最重要的特点是「读少写多」,按照这个思路去选择问题不大。除了这里提到的一些数据库,还有 Cassandra,FoundationDB 等一些
数据库没有提到,在特定的场景下也都值得考虑。对于存储的选择也不只是一个技术问题,可能更重要的是你的公司现在有什么,选一个比较合适的用就好了。

参考

  1. https://www.zhihu.com/question/479761564
  2. https://www.zhihu.com/question/36110917

Kubernetes 定时任务

参照官方文档,直接比着写就行了:

apiVersion: batch/v1
kind: CronJob
metadata:
  name: hello
spec:
  schedule: "*/1 * * * *"  # 也支持 @hourly 这些语法
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: hello
            image: busybox
            imagePullPolicy: IfNotPresent
            command:
            - /bin/sh
            - -c
            - date; echo Hello from the Kubernetes cluster
          restartPolicy: OnFailure

列出所有 cronjob

kubectl get cronjob [JOB_NAME]

NAME    SCHEDULE      SUSPEND   ACTIVE   LAST SCHEDULE   AGE
hello   */1 * * * *   False     0        <none>          10s

列出 cronjob 所有运行过的实例:

kubectl get jobs --watch

NAME               COMPLETIONS   DURATION   AGE
hello-4111706356   0/1                      0s
hello-4111706356   0/1           0s         0s
hello-4111706356   1/1           5s         5s

需要注意的一点,也是 Linux 上的 cron 命令忽视的一点,K8s 提供了 .spec.concurrentPolicy 选项,
用来选择当上一任务还没有执行完毕时,如何处理并发。

  • Allow,默认选项,只要到了时间就触发,并发执行
  • Forbid,如果上一个任务没有执行完毕,忽略本次任务
  • Replace,如果上一个任务没有执行完毕,用新任务替换掉老任务

参考

  1. https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/

SQLAlchemy 2.0 教程

SQLAlchemy 非常优雅,虽然大多数时候可以认为它是一个 ORM 库,但是实际上它分为了两部分——底层的 Core 和上层的传统 ORM。在 Python 乃至其他语言的大多数 ORM 中,都没有实现很好的分离,比如 django 的 ORM,数据库链接和 ORM 本身完全是混在一起的。

为什么要有 Core

Core 层主要实现了客户端连接池的功能。我们知道,关系型数据库作为现代 Web 应用的核心,它的并发链接能力往往并不是很强,最好不要搞好多短链接过去,所以我们一般是需要一个连接池的。链接池大体分为两种,一种是服务端的,也就是一个专门的连接池中间件,把短链接每次分配一个长链接复用。另一种就是客户端的,一般作为程序的一部分实现。SQLAlchemy 的连接池显然是客户端连接池的一种。在这个连接池中,SQLAlchemy 维护了一定数量的链接,当你调用 connect 的时候,实际上是从池子中取出了一个链接,调用 close 的时候实际上是放回到了池子中一个链接。

创建链接

SQLAlcehmy 统一使用 create_engine 来创建链接(池)。create_engine 的参数是一个数据库的 URL。

from sqlalchemy import create_engine

engine = create_engine(
    "mysql://user:[email protected]:3306/dbname",
    echo=True,  # echo 设为 true 会打印出实际执行的 sql,调试的时候更方便
    future=True,  # 使用 2.0API,向后兼容
    pool_size=5, # 连接池的大小默认为 5 个,设置为 0 时表示连接无限制
    pool_recycle: 3600, # 设置时间以限制数据库多久没连接自动断开。
)

# 创建一个 SQLite 的内存数据库
engine = create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True)

# mysqlclient (a maintained fork of MySQL-Python)
# pip install mysqlclient
engine = create_engine('mysql+mysqldb://scott:[email protected]/foo?charset=utf8mb4')

直接使用 SQL

CRUD

from sqlachemy import text

with engine.connect() as conn:
    result = conn.execute(text("select * from users"))
    print(result.all())

# result 还可以遍历,每一个行结果是一个 Row 对象
for row in result:
    # row 对象三种访问方式都支持
    print(row.x, row.y)  
    print(row[0], row[1])
    print(row["x"], row["y"])

# 传递参数,使用 `:var` 传递
result = conn.execute(
...         text("SELECT x, y FROM some_table WHERE y > :y"),
...         {"y": 2}
...     )
# 也可以预先编译好参数
stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)

# 插入的时候比较牛逼,可以直接插入多条
conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 11, "y": 12}, {"x": 13, "y": 14}]
...     )

commit

sqlalchemy 提供两种提交的方式,一种是手工 commit,一种是半自动 commit。官方文档建议使用 engine.begin()。还有一种完全自动的,每一行提交一次的 autocommit 方式,不建议使用。

# "commit as you go"  需要手动 commit
>>> with engine.connect() as conn:
...     conn.execute(text("CREATE TABLE some_table (x int, y int)"))
...     conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 1, "y": 1}, {"x": 2, "y": 4}]
...     )
...     conn.commit()  # 注意这里的 commit

# "begin once"  半自动 commit
>>> with engine.begin() as conn:
...     conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 6, "y": 8}, {"x": 9, "y": 10}]
...     )

ORM

session 不是线程安全的。但是一般情况下,web 框架应该在每个请求开始的时候获得一个 session,所以也不是问题。

from sqlalchemy.orm import Session

with Session(engine) as session:
    session.add(foo)
    session.commit()

# 还可以使用 sessionmaker 来创建一个工厂函数,这样就不用每次都输入参数了

from sqlachemy.orm import sessionmaker
Session = sessionmaker(engine)

with Session() as session:
    ...

声明式 API

from datetime import datetime
from sqlalchemy.sql import func
from sqlalchemy import Integer, Column, String, 
from sqlalchemy.orm import declarative_base, relationship

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String(30))
    fullname = Column(String)
    # 对于特别大的字段,还可以使用 defer,这样默认不加载这个字段
    description = deferred(Column(Text))
    # 默认值,注意传递的是函数,不是现在的时间
    time_created = Column(DateTime(Timezone=True), default=datetime.now)
    # 或者使用服务器默认值,但是这个必须在表创建的时候就设置好
    time_created = Column(DateTime(timezone=True), server_default=func.now())
    time_updated = Column(DateTime(timezone=True), onupdate=func.now())

class Address(Base):
    __tablename__ = "address"
    id = Column(Integer, primary_key=True)
    email_address = Column(String, nullable=False)

# 调用 create_all 创建所有模型
Base.metadata.create_all(engine)

# 如果只需要创建一个模型
User.__table__.create(engine)

外键

多对一关系的双向映射

from sqlalchemy.orm import relationship

class Parent(Base):
    __tablename__ = "parents"
    id = Column(Integer, primary_key=True)
    # 这里指定了一个 children 属性,指向对应的 Child 对象们,back_ref 指定了在对应的对象中的属性名
    children = relationship("Child", backref="parent")

class Child(Base):
    __tablename__ = "children"
    id = Column(Integer, primary_key=True)
    # 这是数据库中存的字段,类型是 Foreign Key
    parent_id = Column(Integer, ForeignKey("parents.id"))

多对多映射

class Map(Base):
    __tablename__ = "map"
    parent_id = Column(Integer, ForeignKey("parents.id"))
    child_id = Column(Integer, ForeignKey("children.id"))

class Parent(Base):
    __tablename__ = "parents"
    id = Column(Integer, primary_key=True)
    children = relationship("Child", secondary=Map, backref="parents")

class Child(Base):
    __tablename__ = "children"
    id = Column(Integer, primary_key=True)

CRUD

和 1.x API 不用的是,2.0 API 中不再使用 query,而是使用 select 来查询数据。

from sqlalchemy import select

stmt = select(User).where(User.name == "john").order_by(User.id)
# order_by 还可以使用 User.id.desc() 表示逆序排列

result = session.execute(stmt)

# 一般情况下,当选取整个对象的时候,都要用 scalars 方法,否则返回的是一个包含一个对象的 tuple
for user in result.scalars():
    print(user.name)

result = session.execute(select(User.name))
for row in result:
    print(row.name)

# 添加对象直接使用 session.add 方法
session.add(user)

# 如果要获取插入后的 ID
session.flush()   # flush 并不是 commit,并没有提交事务,应该是可重复读,和数据库的隔离级别有关。
print(user.id)

# 删除使用 session.delete
session.delete(user)

# 更新数据需要使用 update 语句
from sqlalchemy import update
# synchronize_session 有三种选项: false, "fetch", "evaluate",默认是 evaluate
# false 表示完全不更新 Python 中的对象
# fetch 表示重新从数据库中加载一份对象
# evaluate 表示在更新数据库的同时,也尽量在 Python 中的对象上使用同样的操作
stmt = update(User).where(User.name == "john").values(name="John").execution_options(synchronize_session="fetch")
session.execute(stmt)

# 或者直接赋值
user.name = "John"
session.commit()

# 这里有一个可能引入 race condition(静态条件)的地方
user.visit_count += 1  # 错误!如果两个进程同时更新这个值,可能导致更新失败
# SQL:Update users set visit_count = 2 where user.id = 1
user.visit_count = User.visit_count + 1  # 注意大写的 U,也就是使用了模型的属性,生成的 SQL 是在 SQL 中 +1
# SQL: Update users set visit_count = visit_count + 1 where user.id = 1

加载外键的值

# 默认情况下,在查询中是不会加载外键的,我们可以使用 joinedload 选项来加载外键,从而避免 N+1 问题
session.execute(select(Child)).scalars().all()  # 没有加载 parent 外键
# 使用 joinedload 加载外键,注意需要使用 unique 方法,这是 2.0 中规定的。
session.execute(select(Child).options(joinedload(Child.parent))).unique().scalars().all()

在 2.0 中,更推荐使用 selectinload 而不是 joinedload,一般情况下,selectinload 都要好,而且不用使用 unique.
从原理上来说,joinedload 是通过 join 的方式加载了对应的数据。而 selectinload 是通过 select * from parents where id in … 来实现的。

session.execute(select(Child).options(selectinload(Child.parent))).scalars().all()

外键的写入

SQLAlchemy 中,直接像处理数组一样处理外键就好了,这点非常方便。

parent.children.append(child1)  # 添加
parent.children.remove(child2)  # 删除
parent.children.clear()
parent.children = []  

JSON 字段的特殊处理

大多数的数据库现在都支持 JSON 字段了,在 SQLAlchemy 中我们也可以直接从字段读取 json 对象或者写入 json 对象。
但是,要记住,千万不要直接对这个 json 对象做 update 并期望写回数据库中,这个是不可靠的。一定要复制后读写,然后在赋值回去。

article = session.get(Article, 1)
tags = copy.copy(article.tags)
tags.append("iOS")
article.tags = tags
session.commit()

批量插入

当需要插入大量数据的时候,如果依然采用逐个插入的方法,那么就会在和数据库的交互上浪费很多时间,效率很低。
MySQL 等大多数数据库都提供了 insert ... values (...), (...) ... 这个批量插入的 API,在 SQLAlchemy 中也可以很好地利用这一点。

# 使用 session.bulk_save_objects(...) 直接插入多个对象

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

# 使用 bulk_insert_mappings 可以省去创建对象的开销,直接插入字典
users = [
    {"name": "u1"},
    {"name": "u2"},
    {"name": "u3"},
]
s.bulk_insert_mappings(User, users)
s.commit()

# 使用 bulk_update_mappings 可以批量更新对象,字典中的 id 会被用作 where 条件,其他字段全部用于更新
session.bulk_update_mappings(User, users)

从 1.X API 迁移到 2.0 API

# session.query(User).get(42)
session.get(User, 42)

# session.query(User).all()
session.execute(select(User)).scalars().all()

# session.query(User).filter_by(name="some_user").one()
session.execute(select(User).filter_by(name="some_user")).scalar_one()

# session.query(User).from_statememt(text("select * from users")).a..()
session.execute(select(User).from_statement(text("selct * from users"))).scalars().all()

# session.query(User).filter(User.name == "foo").update({"fullname": "FooBar"}, synchronize_session="evaluate")
session.execute(update(User).where(User.name == "foo").values(fullname="FooBar").execute_options(synchronize_session="evaluate"))

反射——从数据库创建模型

在 FastAPI 中使用

参考

  1. https://docs.sqlalchemy.org/en/14/tutorial/ormrelatedobjects.html
  2. https://stackoverflow.com/questions/25668092/flask-sqlalchemy-many-to-many-insert-data
  3. https://stackoverflow.com/questions/9667138/how-to-update-sqlalchemy-row-entry
  4. https://docs.sqlalchemy.org/en/14/changelog/migration_20.html
  5. https://stackoverflow.com/questions/13370317/sqlalchemy-default-datetime
  6. https://amercader.net/blog/beware-of-json-fields-in-sqlalchemy/
  7. https://stackoverflow.com/questions/26948397/how-to-delete-records-from-many-to-many-secondary-table-in-sqlalchemy
  8. Count by many to many foreign key
  9. https://stackoverflow.com/questions/19175311/how-to-create-only-one-table-with-sqlalchemy/19175907
  10. https://stackoverflow.com/questions/63220132/sqlalchemy-insert-to-mysql-db-unicodeencodeerror-for-cyrlic-data

放弃 requests,拥抱 httpx

httpx 是新一代的 Python http 请求库,它几乎和 requests 的 API 无缝兼容,几乎不用改代码。相对于 requests 来说,有以下优点:

  • 支持 asyncio, 可以直接 async/await 啦
  • 支持 http 2, requests 一直都不支持
  • 实现了正确的 http 代理
  • Cookie 的 API 更友好
  • 有自己的网络连接池,requests 是基于第三方的 urllib3
  • 文档更有条理,更深入

还有就是 requests 有比较明显的内存泄漏,目前我还没有测试过 httpx,所以就不列到优点里了。

httpx 不支持在 client.request 中使用 proxies,必须在 Client 初始化时候指定,这样做应该是考虑到链接池的实现。

Axios 的基本使用

fetch 虽然在现代浏览器中已经支持很好了,但是 API 还是没有那么好用。比如说增加 GET 参数必须使用 URLSearchParm 这个类。相比之下,Axios 的 API 还是更接近直觉一些。

import axios from 'axios'

axios(url, [config]);
axios(config);
axios.get(url, [config])
axios.delete(url, [config])
axios.put(url, data, [config])
axios.post(url, data, [config])

// 请求中 config 的属性
const config = {
    headers: {}, // Headers
    params: {},  // GET 参数
    data: {},  // 默认情况下 {} 会按照 JSON 发送
    timeout: 0, // 默认是没有 timeout 的
}

// 如果要发送传统的 POST 表单
const config = {
    data: new FormData()
}

// headers 会根据 data 是 json 还是表单自动设置,非常贴心。

// 响应的属性
let res = await axios.get("api")
res = {
    data: {},  // axios 会自动 JSON.parse
    status: 200,
    statusText: "OK",
    headers: {},
    config: {},
    request: {}
}
// 和 fetch 不同的是,res.data 可以直接使用,而 fetch 还需要 await res.json()

// 如果要添加一些默认的设置,使用
axios.defaults.headers.common["X-HEADER"] = "value"
axios.defaults.timeout = 3000
// 具体参见这里: https://github.com/axios/axios/blob/master/lib/defaults.js#L28

重定向与其他错误状态码

对于 3XX 重定向代码,axios 只能跟中,这是浏览器决定的,不是 axios 可以改变的。
对于 4XX 和 5XX 状态码,axios 会抛出异常,可以 catch 住。

上传文件

如果一个表单中包含的文件字段,那么传统的方法是把这个字段做为表单的一部分上传。然而现代的 API 大多是 json 接口,并没有 POST 表单这种格式。
这时候可以单独把上传作为一个接口,通过 POST 表单的方式上传,然后返回上传后的路径。在对应的 API 的接口中附件的字段填上这个路径就好了。

参考

  1. https://stackoverflow.com/questions/4083702/posting-a-file-and-associated-data-to-a-restful-webservice-preferably-as-json

Boto3 访问 S3 的基本用法

以前我以为文档坑爹只有一种方式,那就是没有文档。最近在用 boto3, 才让我认识到了文档的另一种坑爹方式:屁话太多。

具体来说是这样的:Boto3 中有两种 API, 低级和高级。其中低级 API 是和 AWS 的 HTTP 接口一一对应的,
通过 boto3.client(“xxx”) 暴露。高级接口是面向对象的,更加易于使用,通过 boto3.resource(“xxx”) 暴露,
美中不足是不一定覆盖了所有的 API.

坑爹的 AWS 文档中,经常混用 resource 和 client 两套接口,也没有任何提示,文档的首页除了简单的提了一句有两套 API 外再没有单独的介绍了。
在没写这篇文章之前,我的脑子里都是乱的,总觉得 S3(Simple Storage Service) 的这个狗屁接口哪里配得上 Simple 这个词,
一会儿是 listobject, 一会儿是 listobject_v2 的。相反,高级 API 是很简单易用的,然而这样一个简单的 API 被深深地埋在了一大堆的低级 API 中,
网上的文章也是一会儿 boto3.client, 一会儿 boto3.resource. 除了有人特意提问两者的区别,很难看到有人说这俩到底是啥。

吐槽完毕。

最近总是用到 S3, 在这里记录一下 Boto3 的简单用法了。Boto3 是整个 AWS 的 SDK, 而不只是包括 S3. 还可以用来访问 SQS, EC2 等等。

如果没有特殊需求的话,建议使用高级 API. 本文一下就记录一些 boto3.resource("s3") 的例子。

import boto3

import boto3

session = boto3.Session(
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    region_name=REGION_NAME,
)

# s3 = boto3.resource("s3")
s3 = session.resouce("s3")

# 创建一个 bucket
bucket = s3.create_bucket(Bucket="my-bucket")

# 获得所有的 bucket, boto 会自动处理 API 的翻页等信息。
for bucket in s3.buckets.all():
    print(bucket.name)

# 过滤 bucket, 同样返回一个 bucket_iterator
s3.buckets.fitler()

# 生成一个 Bucket 资源对象
bucket = s3.Bucket("my-bucket")
bucket.name  # bucket 的名字
bucket.delete()  # 删除 bucket

# 删除一些对象
bucket.delete_objects(
    Delete={
        'Objects': [
            {
                'Key': 'string',
                'VersionId': 'string'
            },
        ],
        'Quiet': True|False
    },
)
# 返回结果
{
    'Deleted': [
        {
            'Key': 'string',
            'VersionId': 'string',
            'DeleteMarker': True|False,
            'DeleteMarkerVersionId': 'string'
        },
    ],
    'RequestCharged': 'requester',
    'Errors': [
        {
            'Key': 'string',
            'VersionId': 'string',
            'Code': 'string',
            'Message': 'string'
        },
    ]
}

# 下载文件
bucket.download_file(Key, Filename, ExtraArgs=None, Callback=None, Config=None)

# 下载到文件对象,可能会自动开启多线程下载
with open('filename', 'wb') as data:
    bucket.download_fileobj('mykey', data)

# 上传文件
object = bucket.put_object(Body=b"data"|file, ContentMD5="", Key="xxx")

# 这个方法会自动开启多线程上传
with open('filename', 'rb') as f:
    bucket.upload_fileobj(f, 'mykey')

# 列出所有对象
bucket.objects.all()

# 过滤并返回对象
objects = bucket.objects.filter(
    Delimiter='string',
    EncodingType='url',
    Marker='string',
    MaxKeys=123,
    Prefix='string',
    RequestPayer='requester',
    ExpectedBucketOwner='string'
)

# 创建一个对象
obj = bucket.Object("xxx")
# 或者
obj = s3.Object("my-bucket", "key")

obj.bucket_name
obj.key

# 删除对象
obj.delete()
# 下载对象
obj.download_file(path)
# 自动多线程下载
with open('filename', 'wb') as data:
    obj.download_fileobj(data)
# 获取文件内容
rsp = obj.get()
body = rsp["Body"].read()  # 文件内容
obj.put(Body=b"xxx"|file, ContentMD5="")

# 上传文件
obj.upload_file(filename)
# 自动多线程上传
obj.upload_fileobj(fileobj)

如果想进一步了解,建议直接点开参考文献 2, 阅读下 resouce 相关接口的文档,其他的低级 client 接口完全可以不看。

参考

  1. https://stackoverflow.com/questions/42809096/difference-in-boto3-between-resource-client-and-session
  2. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#service-resource

对比 Flask,学习 FastAPI

本文假定你对 Flask 等 web framework 已经比较熟悉。不过多解释为什么要这么做,直接上干货。FastAPI 的一些优势大概有:类型检查、自动 swagger 文档、支持 asyncio、强大的依赖注入系统,我在 这个回答 里具体写过,就不展开了。

安装

注意一定要加上这个 [all], 免得后续缺什么依赖。

pip install fastapi[all]

Hello, World

# main.py
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def hello():
    return "Hello, World!"

在命令行开启调试服务器:

uvicorn main:app --reload --port 8000

然后访问 http://localhost:8000 就可以看到 hello world 啦。更妙的是,访问 http://localhost:8000/docs 就可以看到自动生成,可以交互的 Swagger UI 文档了。

注意在上面的装饰器中,直接使用了 app.get 来定义 GET 访问的路径,其他的 post/put/delete 也都同理。Flask 在 1.x 中还必须使用 app.route,在 2.0 中才赶了上来。

同步和异步

# 同步函数
@app.get("/sync")
def hello():
    return "Hello, World!"

# 异步函数
@app.get("/async")
async def hello2():
    return "Hello, World!"

FastAPI 原生支持同时同步和异步函数,而且是完全一样的用法。使用异步函数肯定性能最好,但是有好多库,尤其是访问数据库的库用不了。使用同步函数的话,也不会把整个 Event Loop 锁死,而是会在线程池中执行。所以我个人的建议是,如果你已经习惯了写 Async Python,那么完全可以全部都写异步函数。如果你刚刚开始接触 asyncio 这一套,那么完全可以大部分都写同步函数,通过 profile 找到调用最多,或者对性能影响最大的地方,把这部分切换到异步函数。

请求参数

废话不多说,看请求参数都有哪些。

路径参数

@app.get("/{name})
def hello(name: str):
    return "Hello " + name

访问 http://127.0.0.1:8000/docs 就可以看到 OpenAPI 的文档了。

如果在文档中需要使用 Enum 的话,那么创建一个 Enum 的子类:

from enum import Enum

class ModelName(str, Enum):
   alexnet = "alexnet"
   resnet = "resnet"

@app.get("/models/{model_name}")
def get_model(model_name: ModelName):
    ...

值得注意的一点是,对于结尾的 “/”, FastAPI 不会强制要求匹配。如果你的请求和定义的不一样,那么 FastAPI 会 307 重定向到定义的那一个。比如定义了 app.get("/items/"), 而请求是 requests.get("/items"), 那么会 307 到 /items/. 但是如果部署在代理后面,重定向的地址可能不对,这个可能会引起问题,建议还是前后端统一一下。

另一个需要注意的地方,顺序是很重要的,FastAPI 按照定义的顺序来匹配路由。比如说你定义了两个路径 /users/me/users/{id}, 那么,一定要把 /users/me 放在前边,否则就会匹配到 users/{id}.

url query 参数

GET 参数是 FastAPI 和 Flask 不一样的地方。FastAPI 中统一用函数参数获取,这样也是为了更好地实现类型检查:

fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]

# GET /items/?skip=100&limit=10
@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
    return fake_items_db[skip : skip + limit]

对于 bool 类型,URL 中的 1/true/True/on/yes 都会被转化为 True, 其他都是 False.

http body 参数

POST 参数统一使用 json, 需要使用 PyDantic 的 BaseModel 来定义,只要参数是 BaseModel 类型的,FastAPI 就会从 body 中获取:

from pydantic import BaseModel

class Item(BaseModel):
    name: str
    description: Optional[str] = None
    price: float
    tax: Optional[float] = None

@app.post("/items/")
async def create_item(item: Item):
    return item

@app.put("/items/{item_id}")
async def create_item(item_id: int, item: Item):
    return {"item_id": item_id, **item.dict()}

如果不想定义 Model 验证,只想直接读取请求,那么可以使用 Request 对象,不过 Request 对象只有 async 版本,要想在同步函数中使用,需要一点点 work-around,这算是一个小小的坑吧:

from fastapi import Request, Depends

async def get_json(req: Request):
    return await req.json()

@app.get("/", response_model=None)
def get(req=Depends(get_json)):
    print(req)

其中用得到的 Depends 是 FastAPI 中非常强大的依赖注入系统,后面有时间了再详细讲。

Request 的文档详见:https://www.starlette.io/requests/, 参考:https://github.com/tiangolo/fastapi/issues/2574

Cookie 参数

使用 Cookie 来定义一个从 Cookie 中读取的参数:

from fastapi import Cookie, FastAPI

@app.get("/items")
def read_items(ads_id: Optional[str] = Cookie(None)):
    return {"ads_id": ads_id}

这里有个坑爹的地方,限于浏览器的安全模型,在生成的 Swagger UI 中 Cookie 参数是不能使用的。这是上游 Swagger/OpenAPI 项目的 bug(https://github.com/tiangolo/fastapi/issues/880), 所以在 FastAPI 这里也无解。

一个简单的 work-around 是:如果你有生成 Cookie 的 API, 那么先调用一下生成 Cookie 的 API, 再调用需要 Cookie 认证的 API 的时候就会自动附上这个 Cookie.

Header 参数

和 Cookie 是类似的。FastAPI 会自动把 “User-Agent” 转化成 user_agent 的形式。

from fastapi import FastAPI, Header

@app.get("/items/")
async def read_items(user_agent: Optional[str] = Header(None)):
    return {"User-Agent": user_agent}

参数验证

如果需要对参数进行验证,需要使用 Query, Path 等对象来指定,分别对应 GET 参数和路径:

from fastapi import FastAPI, Query, Path

@app.get("/items")
def read_items(q: Optional[str] = Query(None, min_length=3, max_length=50, regex="^fixedquery$"))
    ...

# 如果不使用默认值的话,可以使用特殊对象 `...`
def read_items(q: Optional[str] = Query(..., max_length=50))
# 数字可以使用 lt/le 或者 gt/ge 定义
def read_items(item_id: int = Path(..., gt=0, lt=100))

响应参数

需要在 app.get 中的 response_model 参数指定响应的 BaseModel:

from pydantic import BaseModel

class Item(BaseModel):
    name: str
    description: Optional[str] = None
    price: float
    tax: Optional[float] = None
    tags: List[str] = []

@app.post("/items/", response_model=Item)
async def create_item(item: Item):
    return item

请求的 body 和响应的 body 经常公用一个 Model, 有些敏感信息可能不适合在响应的 body 中返回,这时候可以使用 response_model_includeresponse_model_exclude 来指定需要包含或者需要过滤出去的字段。

@app.get(
    "/items/{item_id}/name",
    response_model=Item,
    response_model_include=["name", "description"],
    response_model_exclued=["password"]
)
async def read_item_name(item_id: str):
    return items[item_id]

使用 status_code 参数可以更改返回的状态码:

@app.get("/", status_code=201)
def get():
    ...

更新 Cookie

使用 Reponse 对象的 set_cookiedelete_cookie 方法实现增删 Cookie. 注意的是,只需要对 response 对象进行操作,不需要 return response 对象。

from fastapi import FastAPI, Response

@app.post("/cookie-and-object/")
def create_cookie(response: Response):
    response.set_cookie(key="fakesession", value="fake-cookie-session-value")
    response.delete_cookie(key)
    return {"message": "Come to the dark side, we have cookies"}

产生错误

直接 raise HTTPException 就好了

from fastapi import HTTPException

@app.get("/")
def hello():
    raise HTTPException(status_code=404, detail="item not found")

使用路由模块化

FastAPI 中的 router 相当于 Flask 中的 Blueprint, 用来切分应用到不同的模块。

# views/users.py
from fastapi import APIRouter

router = APIRouter()

@router.get("/me")
def myinfo():
    ...

# main.py
from fastapi import FastAPI
from views import users

app = FastAPI()

app.include_router(users.router, prefix="/users")

就到这里吧,简单写了下基本的请求响应和 Router 的使用,这是这个系列的第一篇,敬请期待后续。最后说一句,FastAPI 的官方文档非常详尽,值得一看。不过对于已经有 web 开发知识的人来说稍显啰嗦。

访问日志

FastAPI 默认的日志似乎有一些问题,不会显示出访问日志来

参考

FastAPI 官方文档

Playwright: 比 Puppeteer 更好用的浏览器自动化工具

在 Playwright 之前,我一般会使用 Selenium 或者 Puppeteer 来进行浏览器自动化操作。然而,Selenium 经常会有一些奇怪的 bug, Puppeteer 则是没有官方 Python 版,非官方版本也只有 async 版本,并且也是有一些奇怪的 bug. 另外,众所周知,Python 的 Async API 并不是那么好用。

Playwright 是微软出品的浏览器自动化工具,大厂开源作品,代码质量应该是有足够保证的。而且它还官方支持同步版的 Python API, 同时支持三大浏览器,所以赶紧切换过来了。

特别注意 Playwright 的拼写,别把中间的 “w” 丢了。

安装

pip install playwright==1.8.0a1  # 很奇怪,必须指定版本,不指定会安装到一个古老的版本
python -m playwright install  # 安装浏览器,此处国内网络可能会有问题(你懂的),请自行解决

基本使用

Playwright 支持 Firefox/Chrome/WebKit(Safari). 其中 webkit 最轻量了,所以没有什么特殊需求最好使用 webkit, 不要使用 chromium.

from playwright.sync_api import sync_playwright as playwright

with playwright() as pw:
    webkit = pw.webkit.launch(headless=False)
    context = webkit.new_context(
        extra_http_headers={},
        ignore_https_erros=True,
        proxy={"server": "http://proxy.com"},
        user_agent="Mozilla/5.0...",
        viewport={"height": 1280, "width": 800}
    )  # 需要创建一个 context
    page = context.new_page()  # 创建一个新的页面
    page.goto("https://www.apple.com")
    print(page.content())
    webkit.close()

Playwright 官方推荐使用 with 语句来访问,不过如果你不喜欢的话,也可以用 pw.start() 和 pw.stop().

pw = playwright().start()
pw.webkit.launch()

Context 和 Page 没有提供 with 语句来确保关闭,可以使用 with closing 来实现,或者直接用 try…finally

from contextlib import closing

with closing(webkit.new_context()) as context:
    page = context.new_page()

context = webkit.new_context()
try:
    ...
finally:
    context.close()

新概念:Context

和 Puppeteer 不同的是,Playwright 新增了 context 的概念,每个 context 就像是一个独立的匿名模式会话,非常轻量,但是又完全隔离。比如说,可以在两个 context 中登录两个不同的账号,也可以在两个 context 中使用不同的代理。

通过 context 还可以设置 viewport, user_agent 等。

context = browser.new_context(
  user_agent='My user agent'
)
context = browser.new_context(
  viewport={ 'width': 1280, 'height': 1024 }
)
context = browser.new_context(
    http_credentials={"username": "bill", "password": "pa55w0rd"}
)

# new_context 其他几个比较有用的选项:
ignore_https_errors=False
proxy={"server": "http://example.com:3128", "bypass": ".example.com", "username": "", "password": ""}
extra_http_headers={"X-Header": ""}

context 中有一个很有用的函数context.add_init_script, 可以让我们设定在调用 context.new_page 的时候在页面中执行的脚本。

# hook 新建页面中的 Math.random 函数,总是返回 42
context.add_init_script(script="Math.random = () => 42;")
# 或者写在一个文件里
context.add_init_script(path="preload.js")

还可以使用 context.expose_bindingcontext.expose_function 来把 Python 函数暴露到页面中,不过个人感觉还是使用 addinitscript 暴露 JS 函数方便一些。

和 Puppeteer 一样,Playwright 的核心概念依然是 page, 核心 API 几乎都是 page 对象的方法。可以通过 context 来创建 page.

页面基本操作

按照官网文档,调用 page.goto(url) 后页面加载过程:

  1. 设定 url
  2. 通过网络加载解析页面
  3. 触发 page.on(“domcontentloaded”) 事件
  4. 执行页面的 js 脚本,加载静态资源
  5. 触发 page.on(“laod”) 事件
  6. 页面执行动态加载的脚本
  7. 当 500ms 都没有新的网络请求的时候,触发 networkidle 事件

page.goto(url) 会跳转到一个新的链接。默认情况下 Playwright 会等待到 load 状态。如果我们不关心加载的 CSS 图片等信息,可以改为等待到 domcontentloaded 状态,如果页面是 ajax 加载,那么我们需要等待到 networkidle 状态。如果 networkidle 也不合适的话,可以采用 page.waitforselector 等待某个元素出现。不过对于 click 等操作会自动等待。

page.goto(url, referer="", timeout=30, wait_until="domcontentloaded|load|networkidle")

Playwright 会自动等待元素处于可操作的稳定状态。当然也可以用 page.wait_for_* 函数来手工等待:

page.wait_for_event("event", event_predict, timeout)
page.wait_for_function(js_function)
page.wait_for_load_state(state="domcontentloaded|load|networkidle", timeout)
page.wait_for_selector(selector, timeout)
page.wait_for_timeout(timeout)  # 不推荐使用

对页面的操作方法主要有:

# selector 指的是 CSS 等表达式
page.click(selector)
page.fill(selector, value)  # 在 input 中填充值

# 例子
page.click("#search")

# 模拟按键
page.keyboard.press("Z")
# 支持的按键名字
# F1 - F12, Digit0- Digit9, KeyA- KeyZ, Backquote, Minus, Equal, Backslash, Backspace, Tab, Delete, Escape, ArrowDown, End, Enter, Home, Insert, PageDown, PageUp, ArrowRight, ArrowUp
# 如果是单个字母,那么是大小写敏感的,比如 a 和 A 发送的是不同的按键
# 组合按键
page.keyboard.press("Shift+A")  # Shift, Ctrl, Meta, Alt, 其中 Meta 应该是相当于 Win/Cmd 键
# 其他的按键参考这里:https://playwright.dev/python/docs/api/class-keyboard

# 模拟输入
page.keyboard.type("hello")

在编写爬虫的过程中,向下滚动触发数据加载是个很常见的操作。但是如果想要向下滚动页面的话,似乎现在没有特别好的方法。本来我以为会有 page.scroll 这种方法的,然而并没有。在没有官方支持之前,暂时可以使用 page.keyboard.press("PageDown") 来向下滚动。

获取页面中的数据的主要方法有:

page.url  # url
page.title()  # title
page.content()  # 获取页面全文
page.inner_text(selector)  # element.inner_text()
page.inner_html(selector)
page.text_content(selector)
page.get_attribute(selector, attr)

# eval_on_selector 用于获取 DOM 中的值
page.eval_on_selector(selector, js_expression)
# 比如:
search_value = page.eval_on_selector("#search", "el => el.value")

# evaluate 用于获取页面中 JS 中的数据,比如说可以读取 window 中的值
result = page.evaluate("([x, y]) => Promise.resolve(x * y)", [7, 8])
print(result) # prints "56"

选择器表达式

在上面的代码中,我们使用了 CSS 表达式(比如#button)来选取元素。实际上,Playwright 还支持 XPath 和自己定义的两种简单表达式,并且是自动识别的。

自动识别实际上是有一点点坑的,凡是 . 开头的都会被认为是 CSS 表达式,比如说 .close, .pull-right 等等,但是 .//a 这种也会被认为是 CSS 导致选择失败,所以使用 XPath 的时候就不要用 . 开头了。

# 通过文本选择元素,这是 Playwright 自定义的一种表达式
page.click("text=login")

# 直接通过 id 选择
page.click("id=login")

# 通过 CSS 选择元素
page.click("#search")
# 除了常用的 CSS 表达式外,Playwright 还支持了几个新的伪类
# :has 表示包含某个元素的元素
page.click("article:has(div.prome)")
# :is 用来对自身做断言
page.click("button:is(:text('sign in'), :text('log in'))")
# :text 表示包含某个文本的元素
page.click("button:text('Sign in')")  # 包含
page.click("button:text-is('Sign is')")  # 严格匹配
page.click("button:text-matches('\w+')")  # 正则
# 还可以根据方位匹配
page.click("button:right-of(#search)")  # 右边
page.click("button:left-of(#search)")  # 左边
page.click("button:above(#search)")  # 上边
page.click("button:below(#search)")  # 下边
page.click("button:near(#search)")  # 50px 之内的元素

# 通过 XPath 选择
page.click("//button[@id='search'])")
# 所有 // 或者 .. 开头的表达式都会默认为 XPath 表达式

对于 CSS 表达式,还可以添加前缀css=来显式指定,比如说 css=.login 就相当于 .login.

除了上面介绍的四种表达式以外,Playwright 还支持使用 >> 组合表达式,也就是混合使用四种表达式。

page.click('css=nav >> text=Login')

复用 Cookies 等认证信息

在 Puppeteer 中,复用 Cookies 也是一个老大难问题了。这个是 Playwright 特别方便的一点,他可以直接导出 Cookies 和 LocalStorage, 然后在新的 Context 中使用。

# 保存状态
import json
storage = context.storage_state()
with open("state.json", "w") as f:
    f.write(json.dumps(storage))

# 加载状态
with open("state.json") as f:
    storage_state = json.loads(f.read())
context = browser.new_context(storage_state=storage_state)

监听事件

通过 page.on(event, fn) 可以来注册对应事件的处理函数:

def log_request(intercepted_request):
    print("a request was made:", intercepted_request.url)
page.on("request", log_request)
# sometime later...
page.remove_listener("request", log_request)

其中比较重要的就是 request 和 response 两个事件

拦截更改网络请求

可以通过 page.on(“request”) 和 page.on(“response”) 来监听请求和响应事件。

from playwright.sync_api import sync_playwright as playwright

def run(pw):
    browser = pw.webkit.launch()
    page = browser.new_page()
    # Subscribe to "request" and "response" events.
    page.on("request", lambda request: print(">>", request.method, request.url))
    page.on("response", lambda response: print("<<", response.status, response.url))
    page.goto("https://example.com")
    browser.close()

with playwright() as pw:
    run(pw)

其中 request 和 response 的属性和方法,可以查阅文档:https://playwright.dev/python/docs/api/class-request

通过 context.route, 还可以伪造修改拦截请求等。比如说,拦截所有的图片请求以减少带宽占用:

context = browser.new_context()
page = context.new_page()
# route 的参数默认是通配符,也可以传递编译好的正则表达式对象

# 1
context.route("**/*.{png,jpg,jpeg}", lambda route: route.abort())
# 2
context.route(re.compile(r"(\.png$)|(\.jpg$)"), lambda route: route.abort())
# 3
def no_static(route, req):
    if req.resource_type in {"image", "stylesheet", "media", "font"}:
        route.abort()
    else:
        route.continue_()
context.route("**/*", no_static)
page.goto("https://example.com")

其中 route 对象的相关属性和方法,可以查阅文档:https://playwright.dev/python/docs/api/class-route

灵活设置代理

Playwright 还可以很方便地设置代理。Puppeteer 在打开浏览器之后就无法在更改代理了,对于爬虫类应用非常不友好,而 Playwright 可以通过 Context 设置代理,这样就非常轻量,不用为了切换代理而重启浏览器。

context = browser.new_context(
    proxy={"server": "http://example.com:3128", "bypass": ".example.com", "username": "", "password": ""}
)

杀手级功能:录制操作直接生成代码

Playwright 的命令行还内置了一个有趣的功能:可以通过录制你的点击操作,直接生成 Python 代码。

python -m playwright codegen http://example.com/

Playwright 还有很多命令行功能,比如生成截图等等,可以通过 python -m playwright -h 查看。

其他

除此之外,Playwright 还支持处理页面弹出的窗口,模拟键盘,模拟鼠标拖动(用于滑动验证码),下载文件等等各种功能,请查看官方文档吧,这里不赘述了。对于写爬虫来说,Playwright 的几个特性可以说是秒杀 Puppeteer/Pyppeteer:

  1. 官方同步版本的 API
  2. 方便导入导出 Cookies
  3. 轻量级设置和切换代理
  4. 支持丰富的选择表达式

快点用起来吧!

后记:实战采坑

使用浏览器类工具做爬虫的最大隐患就是内存爆了,所以一定要记得使用 with 语句来确保浏览器关掉了。

参考

  1. https://playwright.dev/python/docs/core-concepts
  2. https://www.checklyhq.com/learn/headless/request-interception/

React 中的表单

表单元素和所有其他元素的不同之处在于:表单元素是输入组件,它是有内部状态的,所有的其他元素都是输出元素。原生的 HTML 表单在 React 中本身也是能用的,但是一般情况下,我们可能会交给一个函数来处理提交表单这个事情,因为需要表单验证等等,这时候我们就可以用”受控组件”了。

什么是受控组件

受控组件就是数据不保存在 DOM 中,而是始终保存在 JS 中,随时通过 value/onChange 读取验证。非受控组件数据依然保存在 DOM 树中,只在 submit 时候读取。

function MyForm(props) {
  value, setValue = useState("");

  function handleChange(e) {
    setValue(e.target.value)
  }

  function handleSubmit(e) {
    console.log("A name was submitted: " + value);
    e.preventDefault();
  }

  return (
    <form onSubmit={handleSubmit}>
      <label>
        Name:
        <input type="text" value={value} onChange={onChange} />
      </label>
      <input type="submit" value="Submit" />
    </form>
  );
}

使用原生表单

完全可以使用原生组件,而不要直接使用受控组件。

function form2kv(form) {
  const fd = new FormData(form);
  const ret = {};
  for (let key of fd.keys()) {
    ret[key] = fd.get(key);
  }
  return ret;
}

function MyForm(props) {

  // 也可以直接通过读取 events.target 的值来获取
  function handleSubmit(event) {
    event.preventDefault();
    console.log(event.target.elements.username.value) // from elements property
    console.log(event.target.username.value)          // or directly
  }

  function handleSubmit(e) {
    e.preventDefault();
    const data = form2kv(e.target);
    axios.post("/api/myresource", data);
  }
  return (
    <form onSubmit={handleSubmit}>
      <label htmlFor="username">Enter username</label>
      <input type="text" id="username" name="username">
      <label htmlFor="password">Enter password</label>
      <input type="password" id="password" name="password">
      <label htmlFor="birthdate">Enter your birth date</label>
      <input type="date" id="birthdate" name="birthdate">
      <button type="submit">Send data!</button>
    </form>
  )
}

在上面的例子中,我们使用了浏览器自带的 Formdata 对象,通过传入 DOM 中的 Form 元素来读取其中的数据。另外需要注意的一点是,我们使用了 onSubmit 事件,而不是 Submit Button 的 onClick 时间,这样保留了浏览器的原生处理方式,也就是可以使用回车键提交。

处理数据

如果当我们需要变换数据格式时呢?

  • 用户输入的数据是 MM/DD/YYYY, 服务端要求是 YYYY-MM-DD
  • 用户名应该全部大写

这时候可以使用另一个小技巧,data-x 属性。

const inputParsers = {
  date(input) {
    const [month, day, year] = input.split('/');
    return `${year}-${month}-${day}`;
  },
  uppercase(input) {
    return input.toUpperCase();
  },
  number(input) {
    return parseFloat(input);
  },
};

function form2kv(form, parsers = {}) {
  const fd = new FormData(form);
  const ret = {};
  for (let name of data.keys()) {
    const input = form.elements[name];
    const parseName = input.dataset.parse;

    if (parseName) {
      const parser = inputParsers[parseName];
      const parsedValue = parser(data.get(name));
      ret[name] = parsedValue;
    } else {
      ret[name] = data.get(name);
    }
  }
  return ret;
}

function MyForm(props) {
  function handleSubmit(e) {
    e.preventDefault();
    const data = form2kv(e.target);
    fetch("api/myresource", method="POST", body=JSON.stringify(data));
  }
  return (
    <form onSubmit={handleSubmit}>
      <label htmlFor="username">Enter username</label>
      <input type="text" id="username" name="username" data-parse="uppercase">
      <label htmlFor="password">Enter password</label>
      <input type="password" id="password" name="password">
      <label htmlFor="birthdate">Enter your birth date</label>
      <input type="date" id="birthdate" name="birthdate" data-parse="date">
      <button type="submit">Send data!</button>
    </form>
  )
}

在上面的例子中,我们通过使用 data-x 属性指定了应该使用的处理函数。通过表单的 elements 属性,我们可以按照 input 的 name 访问元素。而节点的 data-x 属性可以在 dataset[x] 属性中访问。

验证表单

HTML 的原生 API 就支持了很多的验证属性,其中最最好用的就是 pattern 属性了,他可以指定一个正则来验证表单。而且我们也可以通过 type 属性来制定 email 等等特殊的 text 类型。HTML 支持的验证属性有:

  • required, 必填字段,不能为空
  • pattern, 通过一个正则来验证输入
  • minlength/maxlength, 输入文本的长度
  • min/max 数字的区间
  • type, email 等等类型

在 JS 中可以调用 form.checkValidity() 函数获得检查结果。还需要在 form 中添加 novalidate 属性,否则浏览器就会在校验失败的时候直接不触发 submit 事件了。

当表单验证不通过的时候,会获得一个 :invalid 的伪类。但是比较坑爹的是,当表单还没有填写的时候,验证就已经生效了,这样对用户来说非常不友好,不过也很好克服,我们只要设定一个类,比如说 display-errors, 只有当用户尝试提交表单之后再应用这个类就好啦~

displayErrors, setDisplayErrors = useState(false);

function handleSubmit(e) {
  if (!e.target.checkValidity()) {
    setDisplayError(true);
  }
}

return (
  <form
    onSubmit={handleSubmit}
    noValidate  // 即使校验失败也要触发
    className={displayErrors ? 'displayErrors' : ''}
  >
    {/* ... */}
  </form>
)
.displayErrors input:invalid {
  border-color: red;
}

select 组件

原生的 select 组件

<select>
  <option value="grapefruit">Grapefruit</option>
  <option value="lime">Lime</option>
  <option selected value="coconut">Coconut</option>
  <option value="mango">Mango</option>
</select>

在受控的组件中,可以在 select 组件中使用 value 属性指定选项。读取可以使用 e.target.value

<select value={state.value} onChange={handleChange}>
  <option value="grapefruit">Grapefruit</option>
  <option value="lime">Lime</option>
  <option value="coconut">Coconut</option>
  <option value="mango">Mango</option>
</select>

如果需要多选的话,使用 <select multiple={true} value={['B', 'C']}>

file 组件

file 组件是只读的,所以只能使用

textarea 组件

react-hook-form

对于更复杂的逻辑,推荐使用 react-hook-form

react-hook-form 的 v7 中使用的是未受控的组件,这样性能更好。最近一直在使用 tailwind UI,对于原生组件来说写样式更方便一点。

当使用 RHF 的时候,可以配合使用 Yup 来做表单验证。

使用 antd 表单

表单的基本元素是 <Form/>, Form 中嵌套 Form.Item, 其中再嵌套 Input/TextArea 等组件。

表单的数据都在 form 元素中,使用 const form = Form.useForm() 来获取,并通过 form={form} 传递给 Form 控件。

设置了 name 属性的 Form.Item 会被 form 接管,也就是:

  1. 不需要通过 onChange 来更新属性值。当然依然可以通过 onChange 监听
  2. 不能使用 value 和 defaultValue,只能使用 form 的 initialValues/setFieldsValue 设置

总体来说,antd 的表单还是封装过度了,非常难以定制化。

参考

  1. https://reactjs.org/docs/uncontrolled-components.html
  2. Controlled and uncontrolled form inputs in React don’t have to be complicated
  3. How to handle forms with just React
  4. https://reactjs.org/docs/forms.html
  5. https://jsfiddle.net/everdimension/5ry2wdaa/
  6. https://developer.mozilla.org/en-US/docs/Web/API/HTMLFormElement/elements
  7. https://www.cnblogs.com/wonyun/p/6023363.html
  8. https://stackoverflow.com/questions/23427384/get-form-data-in-reactjs
  9. https://github.com/jquense/yup