半监督学习

半监督学习,也叫自训练,是指通过小样本训练一个分类器,利用分类器去分类未标注样本,再反过来利用得到的数据继续迭代优化分类器的方法。
通过半监督学习学习得到的分类器可能比只通过小样本训练出来的分类器效果要好,毕竟见多识广跑得快。

半监督学习也很有可能是人脑学习新知识的模式。毕竟我们在学习新知识的时候,不会有成千上万的训练数据,而是通过几个例子就能举一反三。

弱监督学习成立的条件在于相似的样本大概率是同一类的,效果的好坏取决于相似性的判断好不好。模型除了向伪标签类学习以外,也会像临近的真标签学习。

比如一开始只标了一张男人的图片是正样本和一张母狗的图片是负样本,然后来一张公狗的图片,你说应该判断成啥?这其实很难说对吧,只有 2 个样本的那个模型也确实很难判断,不过我相信大部分人都会认为公狗应该判为负样本。
这就基于大多数人认为公狗和母狗更相近。
所以公狗这个样本虽然没有标记,但是可以利用某种相似性判断的方式,判断出和母狗更相似,因此 label 一方面是听从原先那个有监督模型的意见,另一方面还要听从相似性的意见,和母狗使用同样的标签。
比如有监督模型给出的是[0.5,0.5],相似性给出[0.1,0.9],那综合起来可能就是[0.3, 0.7],这样就认为公狗的 label 是[0.3,0.7],即更偏向负样本,但没那么确定。
一般来讲人们会让这个 label 更确定一些,也就是盲目自信一些,比如看到 0.3,0.7 就给它变成 0.1,0.9。
这样对于模型来说,有监督模型不知道是该分人和狗还是该分性别,有了相似性的约束信息之后,就更偏向于人和狗的分类了,因此只凭借较少的标注样本,和一套合理的相似性判断方法,就可以得到更准确的分类模型。

样本了之后半监督可以提升指标,那干嘛不都加上个半监督?

因为有成本啊,你还得考虑怎么算相似,你还得有大量没标注的同分布样本,然后你还得付出对那些没标注样本训练的成本,计算量变大,实现麻烦,有的项目还不一定能找到合适的样本或方法。对于大样本量的有监督算法而言,半监督的收益也不是很大。

比如你已经标了 1 亿条了,你再加上 5 亿未标注的数据可能效果也不会有啥提升,因为哪怕你把这 6 亿都标了指标也差不多,因为已经饱和了。

最后,机器学习是一门实验科学。当然大多数情况下没有绝对的,方法不好也许实际 cover 了多数情况,也会有收益。所以收益多少一般都要试一试,与此相比更多的情况可能是任务比较复杂不太适合这么做,或者没有成本这么做。

参考

  1. 小海星机器学习讲义
  2. https://en.wikipedia.org/wiki/Semi-supervised_learning
  3. https://en.wikipedia.org/wiki/Weak_supervision

一分钟学一个 Python 标准库之 Pathlib

相对于 os.path 来说,Pathlib 极大地简化了路径相关的操作。举个例子来说:

获取当前文件路径下的 default.yaml 文件

import os

os.path.join(os.path.dirname(__file__), "default.yaml")

只需要:

from pathlib import Path

Path(__file__).parent / "default.yaml"

不难看出,pathlib 有几个优点:

  1. 字符少了很多,出 bug 的地方就少了很多;
  2. 和操作系统一样,pathlib 直接使用路径分隔符 / 来操作,而不是 join 的两个参数;
  3. 非常符合直觉,从左到右阅读。而 os.path 使用函数参数,需要来回跳跃理解。相比之下,Pathlib 阅读起来很清晰。

Python3 标准库中所有接受 str 作为路径参数的地方,现在都可以接受一个 path 对象了。如果你使用的第三方库只接受 str 作为参数,
可以通过 path_str = str(path) 来转化一下。

Pathlib 的其他常用方法:

from pathlib import Path

cwd = Path.cwd()  # 获取当前目录
home = Path.home()  # 获取家目录,比如 /home/ubuntu
path = Path("/home/yifei")  # 创建一个新的 path 对象

path.is_dir()  # 是否是目录
path.is_file()  # 是否是普通文件
path.exists()  # 路径是否存在
path.resolve()  # 解析成绝对路径,比如 Path(".").resolve() 相当于 Path.cwd()

path.mkdir(parents=True, exists_ok=True) # 类似 mkdir -p

# 遍历目录
for child in path.iterdir():
    print(child)

# 除此之外,还有几个很甜的方法,省去了 with open 语句
path = Path("/home/ubuntu/readme.txt")
text = path.read_text()
path.write_text(text)

path = Path("/home/ubuntu/image.png")
image = path.read_bytes()
path.write_bytes(image)

>>> path
PosixPath('/home/ubuntu/test.md')
>>> path.name
'test.md'
>>> path.stem
'test'
>>> path.suffix
'.md'
>>> path.parent
PosixPath('/home/ubuntu')
>>> path.parent.parent
PosixPath('/home')
>>> path.anchor
'/'

在这个注意力涣散的年代,想要经常写一些长篇大论的帖子实在太难了,可能一个月才能憋出一篇。
不如分享写简单但是有用的知识点,或许还能经常更新。

以上就是全部内容啦。

参考

  1. https://realpython.com/python-pathlib/
  2. https://docs.python.org/3/library/pathlib.html

爬虫数据存储的一些选择

爬虫是一种典型的「读少写多」的应用场景。而且爬虫产生的数据一般是作为离线数据,而不是在线数据。
也就是说这些数据主要是用于离线数据分析,而不是直接供线上用户查询使用。
即使线上需要使用爬虫数据,也会需要经过清洗处理过后再放到在线库。总之,不会直接供线上直接使用。

本文总结了一些经验和踩过坑,希望能对读者有帮助。

写入通用原则

  1. 要批量写,不要挨个写
  2. 要顺序写,不要随机写

如果我们开了一个多线程的爬虫,然后每个线程每爬到一条数据就调一下 db.insert(item) 插入数据,数据一多就是灾难性的。

首先,每个线程持有一个数据库链接对数据库的负载就产生了不小压力。其次,每条数据都去调用数据库,那么每次插入时间都得
加上数据库的往返时间,也就是 2RTT(round trip time)。再者,每次插入的都是不同的数据,可能在磁盘的不同位置,导致
磁盘的写入时间大部分都花在寻道上了,磁盘 IO 时间会大幅提升。最后,如果是 SQL 型的数据库,默认配置下,
可能还会有事务的影响。

正确的做法是——用队列。不同的线程都把数据先发到一个队列中,不管是你用的语言自带的内存里的 Queue,还是 redis list,
或者是 kafka,都可以。然后由另一个线程或者脚本读取这个队列,把数据整理之后,定期或者定量写入数据库。

这样做基本上解决了上面提到的每个问题。只有存储线程持有数据库链接,每一条数据不会在需要 2RTT,摊薄下来可能是 2RTT / 1000,
数据经过整理后,每次可以都插入统一中类型的数据,磁盘不需要总是在寻道。

当然,这种方案也会引入一些新的问题,需要注意解决:

  1. 处理流程变成了异步,不能实时在数据中看到最新的爬取结果
  2. 多了消息队列的环节,多了丢数据的可能
  3. 如果消息队列是内存性的,不要让消息队列爆了

以上这些问题都是使用 MQ 的常见问题了,这里不再展开。

数据库选型

大概有这些选择:

  1. CSV、JSON、SQLite 或者其他单机文件
  2. SQL 数据库
  3. MongoDB 等文档数据库
  4. HBase 等 Hadoop 生态圈存储
  5. S3 类型对象存储
  6. Kafka 等持久性消息队列

如果你只是简单地「单线程」爬几页数据分析用,那么存个 CSV 或者 JSON 就可以了。如果你开始上多线程,甚至多机了,
既要考虑写入的时候加锁,而且没法分布式写入,单文件存储就不太合适了。

规模再大一点可以考虑 MySQL。虽然 MySQL 是一个 OLTP 数据库,通常意义上来说更适用于线上数据库。但是对于数据量不大的爬虫来说,
比如说总数据量不会超过 100GiB,也已经足够用了。而且 MySQL 可以添加 unique 索引,一定程度上还能帮助解决数据去重的问题。
这里有几点需要注意:

  1. 使用 ORM 创建的表中包含了不少外键约束之类的东西,对于爬来的数据,中间插入的时候可能还不满足这个外键,最好把这个约束删除掉
  2. 2.

有些人喜欢用 MongoDB 来存储爬虫数据,他们给出的理由也很有吸引力——爬虫数据多是半结构化的,而且数据结构可能经常跟着源网站变,
用 MongoDB 这种 schemaless 的文档数据库再合适不过了。然而我个人非常不推荐用 MongoDB,原因如下:

  1. 我觉得定义好表结构不是一个缺点,反而是一个优点,这样能够在开发调试阶段就发现各种异常情况,保证程序稳定
  2. MySQL 的图形化客户端太多了,比如说 Navicat,Sequel Pro 等等。对于小公司来说,这个客户端就已经够用了,根本不需要开发什么单独的管理后台
    相反,MongoDB 基本没有什么特别好用的客户端
  3. MySQL 和 Postgres 也早就支持了 JSON 字段,实在不是特别规整的数据,存在 JSON 字段就行了
  4. 数据分析的第三方库,比如 pandas,对 SQL 的支持都是原生的,一个 read_sql 就把数据读出来了

数据再多一些,或者并发量再大一些的话,可能单独使用 MySQL 就不合适了。这时候你可以对 MySQL 做定期归档,比如说把添加时间在一个月以上的数据
都按日期写入到 Hive 或者 S3 中,然后删除掉 MySQL 中的数据。这样的做法,其实相当于隐式地把 MySQL 作为了一个消息队列,并起到了缓冲的作用。

再者,MySQL 这种毕竟是行式数据库,如果你的数据数值居多,也可以跳过 MySQL,考虑直接存储到列式数据库中。下游的 Spark,Flink 这些消费端可能更喜欢读取列式数据。

最后一种选项是直接使用 Kafka 这种持久化的消息队列作为存储。DDIA 这本书中提到一个有趣的观点:数据库是日志的积分,日志是数据库的导数。
从某种意义上来说,两者所含有的信息是等价的,可以相互转换。所以直接使用消息队列作为数据存储也未尝不可。

总之,对于爬虫这种场景来说,最重要的特点是「读少写多」,按照这个思路去选择问题不大。除了这里提到的一些数据库,还有 Cassandra,FoundationDB 等一些
数据库没有提到,在特定的场景下也都值得考虑。对于存储的选择也不只是一个技术问题,可能更重要的是你的公司现在有什么,选一个比较合适的用就好了。

参考

  1. https://www.zhihu.com/question/479761564
  2. https://www.zhihu.com/question/36110917

Kubernetes 定时任务

参照官方文档,直接比着写就行了:

apiVersion: batch/v1
kind: CronJob
metadata:
  name: hello
spec:
  schedule: "*/1 * * * *"  # 也支持 @hourly 这些语法
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: hello
            image: busybox
            imagePullPolicy: IfNotPresent
            command:
            - /bin/sh
            - -c
            - date; echo Hello from the Kubernetes cluster
          restartPolicy: OnFailure

列出所有 cronjob

kubectl get cronjob [JOB_NAME]

NAME    SCHEDULE      SUSPEND   ACTIVE   LAST SCHEDULE   AGE
hello   */1 * * * *   False     0        <none>          10s

列出 cronjob 所有运行过的实例:

kubectl get jobs --watch

NAME               COMPLETIONS   DURATION   AGE
hello-4111706356   0/1                      0s
hello-4111706356   0/1           0s         0s
hello-4111706356   1/1           5s         5s

需要注意的一点,也是 Linux 上的 cron 命令忽视的一点,K8s 提供了 .spec.concurrentPolicy 选项,
用来选择当上一任务还没有执行完毕时,如何处理并发。

  • Allow,默认选项,只要到了时间就触发,并发执行
  • Forbid,如果上一个任务没有执行完毕,忽略本次任务
  • Replace,如果上一个任务没有执行完毕,用新任务替换掉老任务

参考

  1. https://kubernetes.io/docs/tasks/job/automated-tasks-with-cron-jobs/

React Helmet

React Helmet 用来在 React 程序中更改 header 的内容。

import React from "react";
import "./styles.css";

import { Helmet } from "react-helmet";

export default function App() {
  return (
    <div>
      <Helmet htmlAttributes>
        <html lang="en" />
        <title>Hello from React Helmet</title>
        <meta name="description" content="Basic example" />
      </Helmet>

      <div className="App">
        <h1>React Helmet Basic Example</h1>
      </div>
    </div>
  );
}

React helmet 支持 html, body 和 head 中的 title, base, meta, link, script, noscript, 和 style 标签。

尽量不要用 helmet 去加载 script 和 style,有更好的解决方案。

奇怪的 bug

在元素中直接使用 html 字符串拼接会报错,使用 JS 的字符串差值就可以,醉了。

Helmet expects a string as a child of <title>

<Helmet>
    <title>Product - { item.name }</title>
</Helmet>

// 改成

<Helmet>
    <title>{ `Product - ${ item.name }` }</title>
</Helmet>

参考

  1. https://www.newline.co/@dmitryrogozhny/when-to-use-and-when-to-avoid-using-react-helmet–bf6f62d5
  2. https://github.com/nfl/react-helmet/issues/274

SQLAlchemy 2.0 教程

SQLAlchemy 非常优雅,虽然大多数时候可以认为它是一个 ORM 库,但是实际上它分为了两部分——底层的 Core 和上层的传统 ORM。在 Python 乃至其他语言的大多数 ORM 中,都没有实现很好的分离,比如 django 的 ORM,数据库链接和 ORM 本身完全是混在一起的。

为什么要有 Core

Core 层主要实现了客户端连接池的功能。我们知道,关系型数据库作为现代 Web 应用的核心,它的并发链接能力往往并不是很强,最好不要搞好多短链接过去,所以我们一般是需要一个连接池的。链接池大体分为两种,一种是服务端的,也就是一个专门的连接池中间件,把短链接每次分配一个长链接复用。另一种就是客户端的,一般作为程序的一部分实现。SQLAlchemy 的连接池显然是客户端连接池的一种。在这个连接池中,SQLAlchemy 维护了一定数量的链接,当你调用 connect 的时候,实际上是从池子中取出了一个链接,调用 close 的时候实际上是放回到了池子中一个链接。

创建链接

SQLAlcehmy 统一使用 create_engine 来创建链接(池)。create_engine 的参数是一个数据库的 URL。

from sqlalchemy import create_engine

engine = create_engine(
    "mysql://user:[email protected]:3306/dbname",
    echo=True,  # echo 设为 true 会打印出实际执行的 sql,调试的时候更方便
    future=True,  # 使用 2.0API,向后兼容
    pool_size=5, # 连接池的大小默认为 5 个,设置为 0 时表示连接无限制
    pool_recycle: 3600, # 设置时间以限制数据库多久没连接自动断开。
)

# 创建一个 SQLite 的内存数据库
engine = create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True)

# mysqlclient (a maintained fork of MySQL-Python)
# pip install mysqlclient
engine = create_engine('mysql+mysqldb://scott:[email protected]/foo?charset=utf8mb4')

直接使用 SQL

CRUD

from sqlachemy import text

with engine.connect() as conn:
    result = conn.execute(text("select * from users"))
    print(result.all())

# result 还可以遍历,每一个行结果是一个 Row 对象
for row in result:
    # row 对象三种访问方式都支持
    print(row.x, row.y)  
    print(row[0], row[1])
    print(row["x"], row["y"])

# 传递参数,使用 `:var` 传递
result = conn.execute(
...         text("SELECT x, y FROM some_table WHERE y > :y"),
...         {"y": 2}
...     )
# 也可以预先编译好参数
stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)

# 插入的时候比较牛逼,可以直接插入多条
conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 11, "y": 12}, {"x": 13, "y": 14}]
...     )

commit

sqlalchemy 提供两种提交的方式,一种是手工 commit,一种是半自动 commit。官方文档建议使用 engine.begin()。还有一种完全自动的,每一行提交一次的 autocommit 方式,不建议使用。

# "commit as you go"  需要手动 commit
>>> with engine.connect() as conn:
...     conn.execute(text("CREATE TABLE some_table (x int, y int)"))
...     conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 1, "y": 1}, {"x": 2, "y": 4}]
...     )
...     conn.commit()  # 注意这里的 commit

# "begin once"  半自动 commit
>>> with engine.begin() as conn:
...     conn.execute(
...         text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
...         [{"x": 6, "y": 8}, {"x": 9, "y": 10}]
...     )

ORM

session 不是线程安全的。但是一般情况下,web 框架应该在每个请求开始的时候获得一个 session,所以也不是问题。

from sqlalchemy.orm import Session

with Session(engine) as session:
    session.add(foo)
    session.commit()

# 还可以使用 sessionmaker 来创建一个工厂函数,这样就不用每次都输入参数了

from sqlachemy.orm import sessionmaker
Session = sessionmaker(engine)

with Session() as session:
    ...

声明式 API

from datetime import datetime
from sqlalchemy.sql import func
from sqlalchemy import Integer, Column, String, 
from sqlalchemy.orm import declarative_base, relationship

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String(30))
    fullname = Column(String)
    # 对于特别大的字段,还可以使用 defer,这样默认不加载这个字段
    description = deferred(Column(Text))
    # 默认值,注意传递的是函数,不是现在的时间
    time_created = Column(DateTime(Timezone=True), default=datetime.now)
    # 或者使用服务器默认值,但是这个必须在表创建的时候就设置好
    time_created = Column(DateTime(timezone=True), server_default=func.now())
    time_updated = Column(DateTime(timezone=True), onupdate=func.now())

class Address(Base):
    __tablename__ = "address"
    id = Column(Integer, primary_key=True)
    email_address = Column(String, nullable=False)

# 调用 create_all 创建所有模型
Base.metadata.create_all(engine)

# 如果只需要创建一个模型
User.__table__.create(engine)

外键

多对一关系的双向映射

from sqlalchemy.orm import relationship

class Parent(Base):
    __tablename__ = "parents"
    id = Column(Integer, primary_key=True)
    # 这里指定了一个 children 属性,指向对应的 Child 对象们,back_ref 指定了在对应的对象中的属性名
    children = relationship("Child", backref="parent")

class Child(Base):
    __tablename__ = "children"
    id = Column(Integer, primary_key=True)
    # 这是数据库中存的字段,类型是 Foreign Key
    parent_id = Column(Integer, ForeignKey("parents.id"))

多对多映射

class Map(Base):
    __tablename__ = "map"
    parent_id = Column(Integer, ForeignKey("parents.id"))
    child_id = Column(Integer, ForeignKey("children.id"))

class Parent(Base):
    __tablename__ = "parents"
    id = Column(Integer, primary_key=True)
    children = relationship("Child", secondary=Map, backref="parents")

class Child(Base):
    __tablename__ = "children"
    id = Column(Integer, primary_key=True)

CRUD

和 1.x API 不用的是,2.0 API 中不再使用 query,而是使用 select 来查询数据。

from sqlalchemy import select

stmt = select(User).where(User.name == "john").order_by(User.id)
# order_by 还可以使用 User.id.desc() 表示逆序排列

result = session.execute(stmt)

# 一般情况下,当选取整个对象的时候,都要用 scalars 方法,否则返回的是一个包含一个对象的 tuple
for user in result.scalars():
    print(user.name)

result = session.execute(select(User.name))
for row in result:
    print(row.name)

# 添加对象直接使用 session.add 方法
session.add(user)

# 如果要获取插入后的 ID
session.flush()   # flush 并不是 commit,并没有提交事务,应该是可重复读,和数据库的隔离级别有关。
print(user.id)

# 删除使用 session.delete
session.delete(user)

# 更新数据需要使用 update 语句
from sqlalchemy import update
# synchronize_session 有三种选项: false, "fetch", "evaluate",默认是 evaluate
# false 表示完全不更新 Python 中的对象
# fetch 表示重新从数据库中加载一份对象
# evaluate 表示在更新数据库的同时,也尽量在 Python 中的对象上使用同样的操作
stmt = update(User).where(User.name == "john").values(name="John").execution_options(synchronize_session="fetch")
session.execute(stmt)

# 或者直接赋值
user.name = "John"
session.commit()

# 这里有一个可能引入 race condition(静态条件)的地方
user.visit_count += 1  # 错误!如果两个进程同时更新这个值,可能导致更新失败
# SQL:Update users set visit_count = 2 where user.id = 1
user.visit_count = User.visit_count + 1  # 注意大写的 U,也就是使用了模型的属性,生成的 SQL 是在 SQL 中 +1
# SQL: Update users set visit_count = visit_count + 1 where user.id = 1

加载外键的值

# 默认情况下,在查询中是不会加载外键的,我们可以使用 joinedload 选项来加载外键,从而避免 N+1 问题
session.execute(select(Child)).scalars().all()  # 没有加载 parent 外键
# 使用 joinedload 加载外键,注意需要使用 unique 方法,这是 2.0 中规定的。
session.execute(select(Child).options(joinedload(Child.parent))).unique().scalars().all()

在 2.0 中,更推荐使用 selectinload 而不是 joinedload,一般情况下,selectinload 都要好,而且不用使用 unique.
从原理上来说,joinedload 是通过 join 的方式加载了对应的数据。而 selectinload 是通过 select * from parents where id in … 来实现的。

session.execute(select(Child).options(selectinload(Child.parent))).scalars().all()

外键的写入

SQLAlchemy 中,直接像处理数组一样处理外键就好了,这点非常方便。

parent.children.append(child1)  # 添加
parent.children.remove(child2)  # 删除
parent.children.clear()
parent.children = []  

JSON 字段的特殊处理

大多数的数据库现在都支持 JSON 字段了,在 SQLAlchemy 中我们也可以直接从字段读取 json 对象或者写入 json 对象。
但是,要记住,千万不要直接对这个 json 对象做 update 并期望写回数据库中,这个是不可靠的。一定要复制后读写,然后在赋值回去。

article = session.get(Article, 1)
tags = copy.copy(article.tags)
tags.append("iOS")
article.tags = tags
session.commit()

批量插入

当需要插入大量数据的时候,如果依然采用逐个插入的方法,那么就会在和数据库的交互上浪费很多时间,效率很低。
MySQL 等大多数数据库都提供了 insert ... values (...), (...) ... 这个批量插入的 API,在 SQLAlchemy 中也可以很好地利用这一点。

# 使用 session.bulk_save_objects(...) 直接插入多个对象

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

# 使用 bulk_insert_mappings 可以省去创建对象的开销,直接插入字典
users = [
    {"name": "u1"},
    {"name": "u2"},
    {"name": "u3"},
]
s.bulk_insert_mappings(User, users)
s.commit()

# 使用 bulk_update_mappings 可以批量更新对象,字典中的 id 会被用作 where 条件,其他字段全部用于更新
session.bulk_update_mappings(User, users)

从 1.X API 迁移到 2.0 API

# session.query(User).get(42)
session.get(User, 42)

# session.query(User).all()
session.execute(select(User)).scalars().all()

# session.query(User).filter_by(name="some_user").one()
session.execute(select(User).filter_by(name="some_user")).scalar_one()

# session.query(User).from_statememt(text("select * from users")).a..()
session.execute(select(User).from_statement(text("selct * from users"))).scalars().all()

# session.query(User).filter(User.name == "foo").update({"fullname": "FooBar"}, synchronize_session="evaluate")
session.execute(update(User).where(User.name == "foo").values(fullname="FooBar").execute_options(synchronize_session="evaluate"))

反射——从数据库创建模型

在 FastAPI 中使用

多进程环境下的使用

由于 Python 中 GIL 的原因,要想利用多核处理器还是需要使用多进程。而多进程中,资源是不能共享的,对应到 SQLAlchemy,也就是连接池是不能共享的。

我们需要手工解决这个问题。

一般情况下,最好还是不要尝试在多个进程中共享同一个 Session。最好是每个进程初始化的时候创建好 Session。

参考

  1. https://docs.sqlalchemy.org/en/14/tutorial/ormrelatedobjects.html
  2. https://stackoverflow.com/questions/25668092/flask-sqlalchemy-many-to-many-insert-data
  3. https://stackoverflow.com/questions/9667138/how-to-update-sqlalchemy-row-entry
  4. https://docs.sqlalchemy.org/en/14/changelog/migration_20.html
  5. https://stackoverflow.com/questions/13370317/sqlalchemy-default-datetime
  6. https://amercader.net/blog/beware-of-json-fields-in-sqlalchemy/
  7. https://stackoverflow.com/questions/26948397/how-to-delete-records-from-many-to-many-secondary-table-in-sqlalchemy
  8. Count by many to many foreign key
  9. https://stackoverflow.com/questions/19175311/how-to-create-only-one-table-with-sqlalchemy/19175907
  10. https://stackoverflow.com/questions/63220132/sqlalchemy-insert-to-mysql-db-unicodeencodeerror-for-cyrlic-data
  11. https://stackoverflow.com/questions/41279157/connection-problems-with-sqlalchemy-and-multiple-processes
  12. https://docs.sqlalchemy.org/en/14/core/pooling.html

写一个 CRUD 还挺难的

让我们只从后端角度出发,考虑写一个简单的博客系统会有哪些问题。这篇文章谈论的并不是某个 Web 框架的 TODO list demo 之类的东西,那都是玩具性质的,而是会谈一谈生产环境中的要考虑的一些实际问题。本文中,我们也不会涉及到像是 MySQL 的几种隔离模式或者是 Kafka 是不是 Exactly Once 这种后端面试常问的八股文,而是从全局考虑一些简单但是又避不开的繁琐问题。

数据库

首先,需要定义一下数据库的表吧。在博客中,我们至少需要两个表:

  1. articles
  2. users

其中 articles 表中应该有一个 author_id 字段关联到 users 表中。那么问题来了,要不要在数据库层面定义物理外键呢?还是仅仅从逻辑上把两者关联起来。

使用物理外键的好处是可以少处理很多异常情况,因为数据库层面已经帮你解决了。但是在开发阶段,当需要删除数据的时候会有很繁琐的依赖问题。在生产环境中,外键也可能带来一些写入的性能问题。

如果只使用逻辑外键呢?那么就需要经常考虑有非法外键的情况,代码中要做好处理,不然就异常满天飞了。

关于数据库的第二个问题,要不要使用 ORM 呢?或许你已经习惯了使用 SQLAlchemy 或是 Hibernate 这类 ORM,并且认为这就是最佳实践。实际上,ORM 并不一定是最好的选择,选择 ORM 可能会有三个缺点:

  1. 意味着你在 SQL 语法之外还要另外学习一门 ORM 中包含的 DSL;
  2. ORM 也不能覆盖所有的 SQL 语句,很多时候你还是得手写 SQL;
  3. ORM 生成的 SQL 经常性能有问题,比如说经常就不小心 N+1 问题或者 select * 了。

甚至于,有的人认为 ORM 技术就是一团泥潭,堪比计算机界的越南战争(美国视角)

当然,不用 ORM 问题也很多,手工拼接 SQL 非常恶心不说,还容易出现 SQL 注入攻击的漏洞。

安全漏洞

正如刚刚提到的 SQL 注入攻击一样,作为一个生产环境的应用,面向大众开放,自然要考虑一些恶意攻击者的访问。

举个例子,你为了方便前端调用,在 API 中留下了一个 order_by=xxx 的参数,为了开发更灵活,这个参数直接对应到了数据库的字段而没有过滤。正常情况下,在你的客户端或者前端代码中,只是简单调用了一下 order_by=create_time,而这个字段是加了索引的,那么皆大欢喜。但是,恶意攻击者可不是这么想的,『我干嘛不调用一下没有索引的字段呢?』。比如说,发送大量的攻击请求到 order_by=first_name 上,那么很快你的数据库就可能被慢查询拖垮了。

这个例子蕴含了一个普遍的道理:灵活和安全不可得兼,你必须针对你的应用,选择一个恰当的地方做好折中。

上面的例子还没有涉及到数据,只是把网站搞挂了而已。另一个容易产生漏洞的地方,就是权限管理了。

假设你有一个用户更新自己文章的 API。那么就应该在其中校验用户传过来的 article_id 是不是他自己的文章。前端校验是不够的,假如恶意用户构造一个请求呢?如果不校验,就可能导致任意修改他人文章的漏洞。再比如,文章状态可能包含草稿和已发布,那么应该只有已发布的文章才可以浏览,但是用户也应该能预览和编辑自己未发布的草稿,你都需要判断是否有适当的权限。问题还可以变得更复杂一点,用户可能分为普通用户和管理员用户,管理员用户又可以查看所有的文章。在稍微大型一点的应用中,文章可能有不同的属性分类,用户更是可能有不同的角色,比如编辑、审核、管理员等等,文章的展现形式也可能多种多样,比如说是完全不可见,还是仅列表隐藏,还是仅列表可见标题,实际访问才会提示不可见/付费内容呢?

后台系统

前面提到了管理员用户,那么就引入了又一个问题——后台管理系统。对于一个博客系统的普通浏览者,看到的可能就是一篇篇的博客,但是对于博客的作者或者管理员来说,一定还有一个后台管理系统来写博客。后台管理系统需要有权限控制,对普通作者,需要限制访问后台的权限,比如说只能访问写作模块,对于管理员,则可以访问用户管理模块。一般来说,后台系统是一个复杂度不亚于前台的部分,这工作量一下就 double 了。现实中的应用,比如说新浪博客,可能还要复杂一些,至少分为三个部分。普通用户看到的博客页面,这部分在水面上,大家都能看到;博主撰写文章,管理自己文章的个人后台;而在新浪博客内部,一定还有一个审核文章,管理用户的内部后台,这工作量一下子就 triple 了。

你可能会说,django admin 这种自动生成的后台系统它不香么?等你尝试着添加一些 JS/CSS 或者是更改一下权限系统就知道了。有人专程写文章批判过:CRUD 代码生成的反模式典范:django admin

性能问题

等你把一切业务功能性的开发都搞定了,是时候考虑性能了。还是以博客系统为例,假如你有一个点赞的功能,那么根据学校里面教的数据库范式,这是一个典型的多对多关系啊,应该弄一个关联表,大概像这样:

users(n) <-> user_liked_articles <-> articles(n)

问题来了,如果首页要展现一个用户最喜欢的文章列表怎么办?在首页上有一个三个表的 join 操作对性能的影响是可想而知的。这时候至少有两种思路:

  1. 放弃正交化,在 article 表中增加一个 num_likes 字段,这样直接只查一个表就出来结果啦。缺点是要在代码中做好冗余信息的同步。
  2. 增加一个缓存,可以缓存整个首页,也可以缓存这条 SQL 的查询结果。缺点是你又得多一个组件,而且查询结果是有延迟的。

这已经是简单到不能再简单的一个小小性能问题了,真正的性能瓶颈可能需要通过使用 profile 工具或者是 wrk 这类的压测工具才能找出并修复。

部署

当你把性能问题也解决差不多的时候,终于到部署了。相对来说,部署还算中规中矩,没有多少幺蛾子。但是至少也要涉及到配置 nginx,申请 SSL 证书这些方面。

在 2021 年的今天,肯定得上 docker 吧,那还得写 dockerfile。如果进一步,要用 k8s 的话,还得写 deployments。或许你还需要配置一下 Jenkins 或者其他 CI 工具……

另外,静态文件怎么放也得考虑一下,就不说 CDN 或者优化图片大小了。至少开发时候你存放的目录得挪到 nginx 对应的目录吧?都是不疼不痒但是必须考虑的杂活儿。

结语

大概写到这里吧,还有好多问题没有涉及,包括但不限于:

  1. 搜索。总不能用 select * like %keyword% 吧?如果上 ES 的话,ES 的查询语法大概也得了解吧,还得考虑到配置从 SQL 数据库到 ES 的同步问题。
  2. 监控和日志。从两个方面来说,一是程序的性能监控,服务挂了得即使知道。另一方面是业务数据的监控,比如说 DAU 是多少。
  3. 测试。代码的单元测试,集成测试等等。
  4. 异步消息。比如说刚刚提到的点赞,被点赞的用户要不要收到一个通知呢?是否要发送邮件通知?邮件通知是不是该搞个消息队列异步操作?redis/kafka?又引入了一个新系统。
  5. 反爬。前面提到的恶意攻击都是以破坏数据为目的的。爬虫稍微好点,只是想把你的数据(全部)搞走。或许你会愿意搜索引擎赶紧收录你的博客文章,但是肯定不希望一个竞争对手把你的所有用户信息全都爬走,然后挨个邀请过去。举两个在反爬上很简单的错误:
    1. 暴露数据库的物理 id。假如你的数据库直接用的自增 id,并且把这个 id 暴露在 API 中,那好了,我直接遍历就完了,所有用户信息拿到。
    2. 没有频控。最起码也要针对 IP 做一个漏桶或者令牌桶的频控吧,不然爬虫流量消耗服务器资源都是很大的问题。

看到这里,希望你对一个系统的复杂和琐碎性能有一个大体的印象,就不要再问出

  1. 10 万块钱能不能做个淘宝?
  2. 4 个月能不能山寨个抖音?
  3. 不就是加个字段么,为啥还要排期?

这种奇葩问题了。总体来说,以上没有什么技术难点。但是每一个点都需要做出取舍折中,特别琐碎。要把每一个小事都考虑好了,还挺复杂的,工作量也不小,而且也不一定是一个人能搞定的。以上所有的这些还都只是后端的问题,另一半前端的问题还完全没有考虑……

对于前端,使用 React 还是 Vue。考虑到 SEO 的话,哪些页面还需要做静态化,这些都是需要考虑的众多问题之一……

最后,或许你还是看不起一个简单的 CRUD 的 web 应用。什么大数据啊、深度学习啊、高并发才是应该掌握的知识嘛!但是,要知道互联网的根基或者说起点从来都是一个简单的后端+前端。你首先做出一个有用的东西,有了用户,慢慢才会产生大数据,才会有高并发的需求。

本文像之前写的「爬虫思路」那篇文章一样,纯意识流瞎写,没有什么架构图,也没有任何参考文章。或许过几个月,我还会写一篇关于前端或者机器学习的文章吧。

放弃 requests,拥抱 httpx

httpx 是新一代的 Python http 请求库,它几乎和 requests 的 API 无缝兼容,几乎不用改代码。相对于 requests 来说,有以下优点:

  • 支持 asyncio, 可以直接 async/await 啦
  • 支持 http 2, requests 一直都不支持
  • 实现了正确的 http 代理
  • Cookie 的 API 更友好
  • 有自己的网络连接池,requests 是基于第三方的 urllib3
  • 文档更有条理,更深入

还有就是 requests 有比较明显的内存泄漏,目前我还没有测试过 httpx,所以就不列到优点里了。

httpx 不支持在 client.request 中使用 proxies,必须在 Client 初始化时候指定,这样做应该是考虑到链接池的实现。

Axios 的基本使用

fetch 虽然在现代浏览器中已经支持很好了,但是 API 还是没有那么好用。比如说增加 GET 参数必须使用 URLSearchParm 这个类。相比之下,Axios 的 API 还是更接近直觉一些。

import axios from 'axios'

axios(url, [config]);
axios(config);
axios.get(url, [config])
axios.delete(url, [config])
axios.put(url, data, [config])
axios.post(url, data, [config])

// 请求中 config 的属性
const config = {
    headers: {}, // Headers
    params: {},  // GET 参数
    data: {},  // 默认情况下 {} 会按照 JSON 发送
    timeout: 0, // 默认是没有 timeout 的
}

// 如果要发送传统的 POST 表单
const config = {
    data: new FormData()
}

// headers 会根据 data 是 json 还是表单自动设置,非常贴心。

// 响应的属性
let res = await axios.get("api")
res = {
    data: {},  // axios 会自动 JSON.parse
    status: 200,
    statusText: "OK",
    headers: {},
    config: {},
    request: {}
}
// 和 fetch 不同的是,res.data 可以直接使用,而 fetch 还需要 await res.json()

// 如果要添加一些默认的设置,使用
axios.defaults.headers.common["X-HEADER"] = "value"
axios.defaults.timeout = 3000
// 具体参见这里: https://github.com/axios/axios/blob/master/lib/defaults.js#L28

重定向与其他错误状态码

对于 3XX 重定向代码,axios 只能跟中,这是浏览器决定的,不是 axios 可以改变的。
对于 4XX 和 5XX 状态码,axios 会抛出异常,可以 catch 住。

上传文件

如果一个表单中包含的文件字段,那么传统的方法是把这个字段做为表单的一部分上传。然而现代的 API 大多是 json 接口,并没有 POST 表单这种格式。
这时候可以单独把上传作为一个接口,通过 POST 表单的方式上传,然后返回上传后的路径。在对应的 API 的接口中附件的字段填上这个路径就好了。

参考

  1. https://stackoverflow.com/questions/4083702/posting-a-file-and-associated-data-to-a-restful-webservice-preferably-as-json

Boto3 访问 S3 的基本用法

以前我以为文档坑爹只有一种方式,那就是没有文档。最近在用 boto3, 才让我认识到了文档的另一种坑爹方式:屁话太多。

具体来说是这样的:Boto3 中有两种 API, 低级和高级。其中低级 API 是和 AWS 的 HTTP 接口一一对应的,
通过 boto3.client(“xxx”) 暴露。高级接口是面向对象的,更加易于使用,通过 boto3.resource(“xxx”) 暴露,
美中不足是不一定覆盖了所有的 API.

坑爹的 AWS 文档中,经常混用 resource 和 client 两套接口,也没有任何提示,文档的首页除了简单的提了一句有两套 API 外再没有单独的介绍了。
在没写这篇文章之前,我的脑子里都是乱的,总觉得 S3(Simple Storage Service) 的这个狗屁接口哪里配得上 Simple 这个词,
底层 API 中一会儿是 listobject, 一会儿是 listobject_v2 的。相反,高级 API 是很简单易用的,却被深深地埋在了一大堆的低级 API 中,
网上的文章也是一会儿 boto3.client, 一会儿 boto3.resource. 除了有人特意提问两者的区别,很难看到有人说这俩到底是啥。

吐槽完毕。

最近总是用到 S3, 在这里记录一下 Boto3 中 S3 相关的简单用法。Boto3 是整个 AWS 的 SDK, 而不只是包括 S3. 还可以用来访问 SQS, EC2 等等。

如果没有特殊需求的话,建议使用高级 API. 本文也从 boto3.resource("s3") 开始。

import boto3

# 隐式创建了一个 session
s3 = boto3.resouce("s3"
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    region_name=REGION_NAME,
    endpoint_url=ENDPOINT_URL,
    verify=False
)

# 比较奇葩的是,resource 接口中可以指定 endpoint_url,session 中却不能指定
# 或者可以通过 profile_name 来指定从 .aws 文件夹中读取已经配置好的 secrets
session = boto3.Session(profile_name="xxx")
s3 = session.resource("s3")

# 创建一个 bucket
bucket = s3.create_bucket(Bucket="my-bucket")

# 获得所有的 bucket, boto 会自动处理 API 的翻页等信息。
for bucket in s3.buckets.all():
    print(bucket.name)

# 过滤 bucket, 同样返回一个 bucket_iterator
s3.buckets.fitler()

# 生成一个 Bucket 资源对象
bucket = s3.Bucket("my-bucket")
bucket.name  # bucket 的名字
bucket.delete()  # 删除 bucket

# 删除一些对象
bucket.delete_objects(
    Delete={
        'Objects': [
            {
                'Key': 'string',
                'VersionId': 'string'
            },
        ],
        'Quiet': True|False
    },
)
# 返回结果
{
    'Deleted': [
        {
            'Key': 'string',
            'VersionId': 'string',
            'DeleteMarker': True|False,
            'DeleteMarkerVersionId': 'string'
        },
    ],
    'RequestCharged': 'requester',
    'Errors': [
        {
            'Key': 'string',
            'VersionId': 'string',
            'Code': 'string',
            'Message': 'string'
        },
    ]
}

# 下载文件
bucket.download_file(Key, Filename, ExtraArgs=None, Callback=None, Config=None)

# 下载到文件对象,可能会自动开启多线程下载
with open('filename', 'wb') as data:
    bucket.download_fileobj('mykey', data)

# 上传文件
object = bucket.put_object(Body=b"data"|file, ContentMD5="", Key="xxx")

# 这个方法会自动开启多线程上传
with open('filename', 'rb') as f:
    bucket.upload_fileobj(f, 'mykey')

# 列出所有对象
bucket.objects.all()

# 过滤并返回对象
objects = bucket.objects.filter(
    Delimiter='string',
    EncodingType='url',
    Marker='string',
    MaxKeys=123,
    Prefix='string',
    RequestPayer='requester',
    ExpectedBucketOwner='string'
)

# 创建一个对象
obj = bucket.Object("xxx")
# 或者
obj = s3.Object("my-bucket", "key")

obj.bucket_name
obj.key

# 删除对象
obj.delete()
# 下载对象
obj.download_file(path)
# 自动多线程下载
with open('filename', 'wb') as data:
    obj.download_fileobj(data)
# 获取文件内容
rsp = obj.get()
body = rsp["Body"].read()  # 文件内容
obj.put(Body=b"xxx"|file, ContentMD5="")

# 上传文件
obj.upload_file(filename)
# 自动多线程上传
obj.upload_fileobj(fileobj)

如果想进一步了解,建议直接点开参考文献 2, 阅读下 resouce 相关接口的文档,其他的低级 client 接口完全可以不看。

参考

  1. https://stackoverflow.com/questions/42809096/difference-in-boto3-between-resource-client-and-session
  2. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#service-resource
  3. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html