Python

Python metaclass 的原理和应用

元编程(meta programming)是一项很神奇的能力,可以通过代码在运行时动态生成代码。元类(meta classes)是 Python 提供的一种元编程的能力。在 Python 中,类也是一种对象,那么类这种对象就是元类的实例,所以我们可以在运行时通过实例化元类动态生成类。

使用 type “函数”

首先我们来了解一下 type,type 可以作为函数使用,用来获得对象的类型:

>>> class Foo:
...     pass
>>> obj = Foo()
>>> obj.__class__
<class '__main__.Foo'>
>>> type(obj)
<class '__main__.Foo'>
>>> obj.__class__ is type(obj)
True

实际上 type 并不是一个函数,而是一个类,我们可以使用 type(type) 来确定一下:

>>> type(type)
<class 'type'>

type 实际上不只是类,而是一个“元类”。我们接下来要可以看到,所有的元类都需要继承自 type。type 是所以类的元类,所以在上面的例子中 x 是 Foo 的实例,Foo 是 type 的实例,type 又是他自己的实例。

file

使用 type 动态创建类

如果传递给 type 的参数是三个的时候,type 的语义就不再是返回给定参数的类,而是实例化生成一个新的类。

type(name: str, bases: tuple, namespace: dict)

第一个参数是新生成的类的名字;第二个参数是新生成的类的基类列表;第三个参数是要个这个类绑定的属性的列表,比如说这个类的一些方法。实际上 class Foo 这种语法只是使用 type 生成类的语法糖而已。

最简单的一个例子,比如我们要创建 Foo[0..9] 这些类,可以这样做:

classes = []
for i in range(10):
    cls = type("Foo%s" % i, tuple(), {})
    classes.append(cls)

# 就像使用普通类一样初始化 Foo0

foo0  = clssses[0]()

如果要实现类的方法,一定要记得同样是要使用 self 变量的。在 Python 中 self 只是一个约定俗称的变量,而不是关键字。

def __init__(self, name):
    self.name = name

def print_name(self):
    print(self.name)

Duck = type("Duck", tuple(), {"__init__": __init__, "print_name": print_name})

duck = Duck("Donald")

duck.print_name()
# Donald

创建自己的元类

首先我们来回顾一下 Python 中类的初始化过程:

foo = Foo()

当这条语句运行的时候,Python 会依次调用 __new____init__ 方法。其中 __new__ 方法在 __init__ 之前调用,并返回已经创建好的新对象,而 __init__ 函数是没有返回结果的。一般情况下,我们都会覆盖 __init__ 方法来对新创建的对象做一些初始化操作。

现在回归到元类上,进入烧脑部分。前面我们说过元类的实例化就是类,所以大致相当于:

Foo = MetaFoo(name, bases, attrs)  # MetaFoo 默认情况下是 type
foo = Foo()

默认情况下,所有类的元类是 type,也就是在这个类是通过 type 来创建的,这和前面说的通过 type 来动态创建类也是一致的。

那么怎样定义一个 MetaFoo 呢?只需要继承自 type 就行了。因为元类的实例化就是类的创建过程,所以在元类中,我们可以修改 __new__ 来在 __init__ 之前对新创建的类做一些操作。

>>> class MetaFoo(type):
...     def __new__(cls, name, bases, namespace):
...         x = super().__new__(cls, name, bases, namespace)  # super实际上就是 type
...         x.bar = 100  # 为这个类增加一个属性
...         return x
...

>>> Foo = MetaFoo("Foo", tuple(), {})  # MetaFoo 在这里就相当于 type 了,可以动态创建类
>>> Foo.bar
100
>>> foo = Foo()
>>> foo.bar
100

在这里我们创建了 MetaFoo 这个元类,他会给新创建的类增加一个叫做 bar 的属性。

在实际的代码中,我们一般还是不会直接动态生成类的,还是调用 class Foo 语法来生成类比较常见一点,这时候可以指定 metaclass 参数就好了。可以通过 Foo(metaclass=MetaFoo) 这种方式来指定元类。

class Foo(metaclass=MetaFoo):
    pass

这种定义和上面的元类用法效果完全是一致的。

一个现实世界的元类例子

在 django.models 或者 peewee 等 ORM 中,我们一般使用类的成员变量来定义字段,这里就用到了元类。

class Field:
    pass

class IntegerField(Field):
    pass

class CharField(Field):
    pass

class MetaModel(type):
    def __new__(meta, name, bases, attrs):
        # 这里最神奇的是:用户定义的类中的 bases 和 attrs 都会作为参数传递进来
        fields = {}
        for key, value in attrs.items():
            if isinstance(value, Field):
                value.name = '%s.%s' % (name, key)
                fields[key] = value
        for base in bases:
            if hasattr(base, '_fields'):
                fields.update(base._fields)
        attrs['_fields'] = fields
        return type.__new__(meta, name, bases, attrs)

class Model(metaclass=MetaModel):
    pass

这样用户使用的时候就可以这样定义:

>>> class A(Model):
...     foo = IntegerField()
...
>>> class B(A):
...     bar = CharField()
...
>>> B._fields
{'foo': Integer('A.foo'), 'bar': String('B.bar')}

程序在执行的时候就可以直接访问 X._fields,而不用每次都通过反射遍历一次,从而提高效率以及做一些验证。

不过,其实这个完全可以通过装饰器来实现:

def model(cls):
    fields = {}
    for key, value in vars(cls).items():
        if isinstance(value, Field):
            value.name = '%s.%s' % (cls.__name__, key)
            fields[key] = value
    for base in cls.__bases__:
        if hasattr(base, '_fields'):
            fields.update(base._fields)
    cls._fields = fields
    return cls

@model
class A():
    foo = IntegerField()

class B(A):
    bar = CharField()

但是用装饰器的话,就失去了一些类型继承的语义信息。

总结与思考

Python 中的元编程还是一种很强大的特性,但是也比较复杂,有时候很难以理解。实际上,过分的动态特性也导致了 Python 的解释器和静态分析、自动补全等很难优化,因为有好多信息必须到运行时才能知道。

实际上近些年新开发的语言越来越多地加入了静态类型的特性,比如 swift, rust, go 等。就连 Python 本身也增加了 type hinting 的功能,很遗憾的是,这个功能不是强制性的,所以也很难用来提升性能。

元类这块应该是我在 Python 语言方面了解的最后一大块知识了。接下来除了写业务代码不会再深究 Python 了,研究 Golang 去了~

Au revoir, Python!

参考

  1. https://realpython.com/python-metaclasses/
  2. https://stackoverflow.com/questions/392160/what-are-some-concrete-use-cases-for-metaclasses
  3. https://blog.ionelmc.ro/2015/02/09/understanding-python-metaclasses/
  4. https://stackoverflow.com/questions/2608708/what-is-the-difference-between-type-and-type-new-in-python

LeetCode 1236/1242 设计一个(多线程)爬虫解法

单线程题目 LeetCode-1236

具体题目就不说了,直接去 LeetCode 上看就好了。1236 要求使用单线程即可,考察的主要是图的遍历。只要注意到对于新发现的节点需要考虑是否已经访问过就好了。在实际生产中,肯定也是要用广度优先,深度优先基本就会陷进一个网站出不来了。

from urllib.parse import urlsplit

class Solution:
    def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
        domain = urlsplit(startUrl).netloc
        q = [startUrl]
        visited = set([startUrl])
        while q:
            newUrls = []
            for url in q:
                urls = htmlParser.getUrls(url)
                for newUrl in urls:
                    u = urlsplit(newUrl)
                    if u.netloc != domain:
                        continue
                    if newUrl in visited:
                        continue
                    visited.add(newUrl)
                    newUrls.append(newUrl)
            q = newUrls
        return list(visited)

多线程题目 LeetCode-1242

1242 题要求使用多线程来实现。在现实生活中,爬虫作为一个 IO 密集型的任务,使用多线程是一项必须的优化。

在上述的单线程版本中,我们使用了 visited 这个数组来存放已经访问过的节点,如果我们采用多线程的话,并且在每个线程中并发判断某个 URL 是否被访问过,那么势必需要给这个变量加一个锁。而我们知道,在多线程程序中,加锁往往造成性能损失最大,最容易引起潜在的 bug。那么有没有一种办法可以不用显式加锁呢?

其实也很简单,我们只要把需要把并发访问的部分放到一个线程里就好了。这个想法是最近阅读 The Go Programming Language 得到的启发。全部代码如下:

import threading
import queue
from urllib.parse import urlsplit

class Solution:
    def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
        domain = urlsplit(startUrl).netloc
        requestQueue = queue.Queue()
        resultQueue = queue.Queue()
        requestQueue.put(startUrl)
        for _ in range(5):
            t = threading.Thread(target=self._crawl, 
                args=(domain, htmlParser, requestQueue, resultQueue))
            t.daemon = True
            t.start()
        running = 1
        visited = set([startUrl])
        while running > 0:
            urls = resultQueue.get()
            for url in urls:
                if url in visited:
                    continue
                visited.add(url)
                requestQueue.put(url)
                running += 1
            running -= 1
        return list(visited)

    def _crawl(self, domain, htmlParser, requestQueue, resultQueue):
        while True:
            url = requestQueue.get()
            urls = htmlParser.getUrls(url)
            newUrls = []
            for url in urls:
                u = urlsplit(url)
                if u.netloc == domain:
                    newUrls.append(url)
            resultQueue.put(newUrls)

在上面的代码中,我们开启了 5 个线程并发请求,每个 worker 线程都做同样的事情:

  1. 从 requestQueue 中读取一个待访问的 url;
  2. 执行一个很耗时的网络请求:htmlParser.getUrls
  3. 然后把获取到的新的 url 处理后放到 resultQueue 中。

而在主线程中:

  1. 从 resultQueue 中读取一个访问的结果
  2. 判断每个 URL 是否已经被访问过
  3. 并分发到 requestQueue 中。

我们可以看到在上述的过程中并没有显式使用锁(当然 queue 本身是带锁的)。原因就在于,我们把对于需要并发访问的结构限制在了一个线程中。

在 IPython 中自动重新导入包

在使用 IPython 交互性测试编写的函数的时候,可以打开自动重新导入包的功能,这样每次保存后就可以直接测试了。

In [1]: %load_ext autoreload

In [2]: %autoreload 2

其中三个数字的含义是:

  • %autoreload 0 – 关闭自动重新导入
  • %autoreload 1 – 只在 import 语句重新导入
  • %autoreload 2 – 调用的时候自动重新导入

如果想要在 IPython 中自动启用

$ ipython profile create
$ vim ~/.ipython/profile_default/ipython_config.py
c.InteractiveShellApp.extensions = ['autoreload']
c.InteractiveShellApp.exec_lines = ['%autoreload 2']

macOS 中如何正确安装 pycurl

Reinstall the curl libraries

brew install curl --with-openssl

Install pycurl with correct environment and paths

export PYCURL_SSL_LIBRARY=openssl
pip uninstall pycurl 
pip install --no-cache-dir --global-option=build_ext --global-option="-L/usr/local/opt/openssl/lib" --global-option="-I/usr/local/opt/openssl/include"  pycurl

Python 微型ORM Peewee 教程

Python 中最著名的 ORM 自然是 sqlalchemy 了,但是 sqlalchemy 有些年头了,体积庞大,略显笨重。Peewee 还比较年轻,历史包袱比较少,也仅仅支持 Postgres、MySQL、Sqlite 这三种互联网公司最常见的数据库,所以整体上来说是比较轻量的。

连接和创建数据库

db.init(**args)
db.connect()
db.create_table([Person])

连接池

自动重连,保持连接

在长时间运行的后台脚本使用数据库的时候,可能会遇到连接丢失的问题。peewee 提供了一个 Mixin 可以在连接丢失时候重连,这点比 django 方便多了。

from peewee import MySQLDatabase
from playhouse.shortcuts import ReconnectMixin

class ReconnectMySQLDatabase(ReconnectMixin, MySQLDatabase):
    pass

db = ReconnectMySQLDatabase('my_app', ...)

定义表

peewee 在创建模型的时候就设定了数据库链接,个人感觉这个设计似乎不是很好。不过好在可以先不指定参数,而在实际使用的时候再链接数据库。

import peewee as pw

db = SqliteDatabase(None)  # 这里不配置数据库链接是为了之后方便更改不同环境

class Person(pw.Model):
    name = pw.CharField()
    birthday = pw.DateField()

    class Meta:
        database = db

class Pet(Model):
    owner = ForeignKeyField(Person, backref='pets')
    name = CharField()
    animal_type = CharField()

    class Meta:
        database = db

如果有自引用的外键,可以使用 "self" 来指定。如果有循环引用的外键,可以使用 DeferredForeignKey。
在 django 的 ORM 中,我们可以直接使用 FIELD_id 这样来访问一个外键的 id。这个在 peewee 中也是支持的。但是在设置的时候却不需要加上 _id 的后缀。在使用 where 语句的时候也不需要使用后缀。

event_id = ticket.event_id
ticket.event = new_event_id
Ticket.select().where(event == desired_event_id)

执行裸SQL

database.execute_sql()

增删改查

读取数据

基本的语法是 Model.select(fields).where(**coditions).get(). 或者直接简写成 Model.get()

# peewee 只会查询一次数据库,不管迭代多少次。
query = Pet.select().where(Pet.animal_type == 'cat')
for pet in query:
    print(pet.name, pet.owner.name)  # 注意这里有 N+1 问题,N 指的是获取 owner.name

grandma = Person.get(Person.name == 'Grandma L.')

# in 查询使用 in_ 方法
Pet.select().where(Pet.id.in_([1,2]))

# 对于 id 可以直接使用 get_by_id

Person.get_by_id(100)

# 使用 get_or_none 阻止抛出异常

Person.get_or_none()

# 可以使用 join 解决 N+1 问题
query = (Pet
         .select(Pet, Person)
         .join(Person)
         .where(Pet.animal_type == 'cat'))
         .order_by(Pet.name)  # 或者 Pet.name.desc() 逆序排列

for pet in query:
    print(pet.name, pet.owner.name)

可以直接使用 | 来作为查询条件,这个相比 django 需要使用 Q 来说,设计地非常优雅。

d1940 = date(1940, 1, 1)
d1960 = date(1960, 1, 1)
query = (Person
         .select()
         .where((Person.birthday < d1940) | (Person.birthday > d1960)))

for person in query:
    print(person.name, person.birthday)

# prints:
# Bob 1960-01-15
# Grandma L. 1935-03-01

query.count()  #  返回记录的大小

get_or_create

peewee 模仿 django 实现了 get_or_create 的方法。注意他的参数是 Django 风格的,而不是 peewee 的 model.attr == xxx 的风格。

person, created = Person.get_or_create(
    first_name=first_name,
    last_name=last_name,
    defaults={'dob': dob, 'favorite_color': 'green'})

iterator

对于返回结果过多的查询,可以使用 iterator 方法。

返回简单对象

插入数据

跟 django 的 ORM 貌似是一样的。使用 Model.create() 或者 Model.save() 或者 Model.insert()

from datetime import date

# 使用 save
uncle_bob = Person(name='Bob', birthday=date(1960, 1, 15))
uncle_bob.save() # bob is now stored in the database

# 使用 create
grandma = Person.create(name='Grandma', birthday=date(1935, 3, 1))
bob_kitty = Pet.create(owner=uncle_bob, name='Kitty', animal_type='cat')  # 带有外键的宠物

# 使用 bulk_create
User.bulk_create(users, batch_size=100)

# 使用 insert
User.insert(username="mickey").execute()

# 使用 insert many。或者使用 tuple 也可以
data_source = [
    {'field1': 'val1-1', 'field2': 'val1-2'},
    {'field1': 'val2-1', 'field2': 'val2-2'},
    # ...
]

# Fastest way to INSERT multiple rows.
MyModel.insert_many(data_source).execute()

# We can INSERT tuples as well...
data = [('val1-1', 'val1-2'),
        ('val2-1', 'val2-2'),
        ('val3-1', 'val3-2')]

# But we need to indicate which fields the values correspond to.
MyModel.insert_many(data, fields=[MyModel.field1, MyModel.field2]).execute()

更新数据

可以使用 Model.update 或者 model.save 更新数据。

# 使用 save 更新
herb_fido.owner = uncle_bob
herb_fido.save()

# 使用 update 更新
query = Tweet.update(is_published=True).where(Tweet.creation_date < today)

# 批量更新数据
# First, create 3 users with usernames u1, u2, u3.
u1, u2, u3 = [User.create(username='u%s' % i) for i in (1, 2, 3)]

# Now we'll modify the user instances.
u1.username = 'u1-x'
u2.username = 'u2-y'
u3.username = 'u3-z'

# Update all three users with a single UPDATE query.
User.bulk_update([u1, u2, u3], fields=[User.username])

需要注意的是,在使用 update 的时候千万不要在 Python 中使用计算再更新,要使用 SQL 语句来更新,这样才能具有原子性。

错误做法

>>> for stat in Stat.select().where(Stat.url == request.url):
...     stat.counter += 1
...     stat.save()

正确做法

>>> query = Stat.update(counter=Stat.counter + 1).where(Stat.url == request.url)
>>> query.execute()

删除数据

可以使用 model.delete_instance 或者 Model.delete。

# 使用 object.delete_instance
herb_mittens.delete_instance()

# 使用 Model.delete
Tweet.delete().where(Tweet.creation_date < one_year_ago).execute()

一些有用的拓展

模型转换成字典

除了在查询的时候使用 model.dicts 以外,还可以使用 model_to_dict(model) 这个函数。

>>> user = User.create(username='charlie')
>>> model_to_dict(user)
{'id': 1, 'username': 'charlie'}

从数据库生成模型

最后也是最牛逼的一点,可以使用 pwiz 工具从已有的数据库产生 peewee 的模型文件:

python -m pwiz -e postgresql charles_blog > blog_models.py

参考

  1. https://stackoverflow.com/questions/45345549/peewee-mysql-server-has-gone-away-error/57797698#57797698

Python 环境变量的一个坑

Python 中可以使用 os.environ 操作环境变量,前几天看到了其他几个函数 os.getenv 和 os.putenv。然而 os.putenv 是一个大坑,os.putenv 之后,在后面的 os.getenv 中并不能读出来。囧

# 参考

1. https://mail.python.org/pipermail/python-list/2013-June/650294.html

如何调试 Python 的 Core Dump

如果需要记录 Core Dump 的原因,首先需要使用 faulthandler 参数启动 Python

“`
python -X faulthandler main.py
“`

出 core 之后,可以使用 gdb 调试

“`
gdb python core
“`

参考

1. https://stackoverflow.com/questions/2663841/python-tracing-a-segmentation-fault/2664232#2664232

使用 prctl 在父进程退出的时候安全退出子进程

在 Linux 中, 当子进程退出的时候, 父进程可以收到信号, 但是当父进程退出的时候, 子进程并不会受到信号. 这样就造成了在父进程崩溃的时候, 子进程并不能同时退出, 而是一直会在后台运行, 比如下面的例子:

“`
import os
import time

def loop_print():
import time
while True:
print(‘child alive, %s’ % time.time())
time.sleep(1)

try:
pid = os.fork()
except OSError:
pass

if pid != 0: # parent
print(‘parent sleep for 2’)
time.sleep(2)
print(‘parent quit’)
else:
loop_print()
“`

当父进程退出的时候, 子进程一直在不断地 print, 而没有退出.

# naive 的方法, 使用 multiprocessing 库

昨天我已经吐槽过标准库的 multiprocessing 有很多坑, 不出所望, 在这个问题上 multiprocessing 依然提供了半个解法, 只解决了一半问题……

在使用 multiprocessing 库创建进程的时候, 可以设置 `Process.daemon = True`, 这个属性又是模仿 threading 库的 API 来的.

正常情况下, 当一个程序收到 SIGTERM 或者 SIGHUP 等信号的时候, multiprocessing 会调用每个子进程的 terminate 方法, 这样会给每个子进程发送 SIGTERM 信号, 子进程就可以优雅退出. 然而, 当异常发生的时候, 父进程挂了, 比如说收到了 SIGKILL 信号, 那么子进程就得不到收割, 也就变成了孤儿进程.

所以说, multiprocessing 库只解决了半个问题, 真遇到问题的时候就会坑你一把.

# 正确解决方法

Linux 提供了 prctl 系统调用, 可以由子进程向内核注册父进程退出时候收到什么信号, 我们只要注册一个 SIGTERM 信号就好了.

在 Python 中可以使用 python-prctl 这个包.

## 安装

“`
# apt install libcap-dev && pip install python-prctl
“`

## 使用

以上面的程序为例:

“`
import os
import time

def loop_print():
import time
import prctl
import signal
prctl.set_pdeathsig(signal.SIGTERM)
while True:
print(‘child alive, %s’ % time.time())
time.sleep(1)

try:
pid = os.fork()
except OSError:
pass

if pid != 0: # parent
print(‘parent sleep for 2’)
time.sleep(2)
print(‘parent quit’)
else:
loop_print()
“`

这次我们看到, 在父进程退出的同时, 子进程也推出了.

“`
parent sleep for 2
child alive, 1539676057.5094635
child alive, 1539676058.5105338
parent quit
“`

吐槽一下 Python 混乱的 threading 和 multiprocessing

最近要写一个库往 influxdb 中打点, 因为要被很多程序使用, 而又要创建新的进程, 为了避免引起使用方的异常, 简单深入了解了下 Python 的并发控制, 这才发现标准库真是坑. 之前没过多考虑过, 只是凭感觉在 CPU 密集的时候使用 multiprocessing, 而默认使用 threading, 其实两个还是有很多不一样的, 除了都是并发执行以外还有很大的不同. Python 中试图用 threading 和 multiprocessing 实现类似的接口来统一两方面, 结果导致更混乱了. 本文探讨几个坑.

# 在多线程环境中 fork

首先不谈 Python, 我们思考一下, 在多线程环境下如果执行 fork 会怎样? 在新的进程中, 会不会所有线程都在运行? 答案是否定的, **在 fork 之后, 只有执行 fork 的线程在运行**, 而其他线程都不会运行. 这是 POSIX 标准规定的:

> A process shall be created with a single thread. If a multi-threaded process calls fork(), the new process shall contain a replica of the calling thread and its entire address space, possibly including the states of mutexes and other resources. Consequently, to avoid errors, the child process may only execute async-signal-safe operations until such time as one of the exec functions is called. Fork handlers may be established by means of the pthread_atfork() function in order to maintain application invariants across fork() calls.

但是这时候其他线程持有的锁并不会自动转化到当前线程, 所以可能造成死锁. 关于在多线程程序中执行 fork 会造成的问题, 有好多文章有详细的讨论:

1. http://www.linuxprogrammingblog.com/threads-and-fork-think-twice-before-using-them
2. https://stackoverflow.com/questions/1073954/fork-and-existing-threads/1074663#1074663

# 在 python 的 daemon thread 中 fork 又会怎样

在 Python 中可以把线程设置为 daemon 状态, 如果一个进程中只有 daemon thread, 这个进程就会自动退出. 那么问题来了, 如果我们 daemon thread 中执行 fork 会怎样呢?

理论上来说, 既然 fork 之后只有一个线程, 而这个线程又是 daemon 线程, 那么显然这个进程应该直接退出的, 然而并不会这样, 这个进程会一直运行, 直到该线程退出. 这是因为 fork 之后, 唯一的线程自动成为了 main thread, 而 Python 中硬编码了 main thread 不是 daemon thread, 所以这个线程不会退出.

参考:

1. https://stackoverflow.com/questions/31055960/is-it-a-python-bug-that-the-main-thread-of-a-process-created-in-a-daemon-thread

# 在新创建的进程中创建线程又会怎样

在普通进程中, 进程在所有非daemon 的线程退出之后才会推出, 但是在新创建的进程中, 不论创建的线程是 daemon thread 还是不是 daemon thread 都会在主线程退出后退出. 这是 Python 的一个 [bug](https://bugs.python.org/issue18966), 这个 bug 最早在 2013-09-08 01:20 报告出来, 而直到 2017-08-16 18:54 的 Python 3.7 才修复…

如何复现这个 bug

“`
#!/usr/bin/env python

from multiprocessing import Process
from threading import Thread
from time import sleep

def proc():
mythread = Thread(target=run)
mythread.start()
print(‘Thread.daemon = %s’ % mythread.daemon)
sleep(4)
#mythread.join()

def run():
for i in range(10):
sleep(1)
print(‘Tick: %s’ % i)

if __name__ == ‘__main__’:
p = Process(target=proc)
p.start()
print(‘Process.daemon = %s’ % p.daemon)
“`

下面大概说下这个 bug 的原因:

1. 普通进程会调用 `sys.exit()` 退出, 在这个函数中会调用 `thread.join()` 也就是会等待其他线程运行结束
2. 在 Python 3.4 之前, 默认只会使用 fork 创建线程, 而对于 fork 创建的线程, 会使用 `os._exit()` 退出, 也就是不会调用 `thread.join()`. 所以也就不会等待其他线程退出
3. 在 Python 3.4 中引入了对 `spawn` 系统调用的支持, 可以通过 `multiprocessing.set_start_method` 来设定创建进程使用的系统调用. 而使用 `spawn` 调用创建的进程会通过 `sys.exit()` 退出, 也就避免了这个 bug 的影响. 而使用 `fork` 创建的进程依然受到这个 bug 的影响.
4. 在 Python 3.7 中终于在添加了 `thread._shutdown` 的调用, 也就是会 join 其他的 thread.

# fork vs spawn 造成的 OS 平台差异性

我们知道, 在 `*nix` 系统中创建一个一个新的进程可以使用系统调用 `fork`, 父进程的所有资源都会被复制到子进程中, 当然是 Copy On Write 的. 如果要执行一个新的程序, 必须在 `fork` 之后调用 `exec*` 家族的系统调用, 后来 Linux 中添加了 `spawn` 系统调用, `spawn` 和 `fork` 的不同是, 他是从头创建了一个新的子程序, 而不是像 `fork` 一样复制了一份父进程.

而在 Windows 上, 从来没有类似 `fork` 的系统调用, 只有类似 `spawn` 的系统调用, 也就是从头创建一个新的程序.

对于 Python 的影响. 在 `*nix` 操作系统上, 当使用 multiprocessing 的时候, 默认调用的是 fork, 在新的进程中所有导入的包都已经在了, 所以不会再 import 一次. 而在 Windows 系统上, 使用 multiprocessing 创建新的进程时, 所有包都会被在新进程中重新 import 一遍, 如果 import 操作是对外部系统有副作用的, 就会造成不同.

当然如上文所述, 在 Python 3.4 之后可以选择创建进程时使用的系统调用, 如果选择了 `spawn`, 那么在各个平台上行为就是统一的了.

参考:

1. 为什么要区别 fork 和 exec: https://www.zhihu.com/question/66902460
2. fork 和 spawn 造成的有趣影响: https://zhuanlan.zhihu.com/p/39542342
2. https://stackoverflow.com/questions/38236211/why-multiprocessing-process-behave-differently-on-windows-and-linux-for-global-o

# fork 和 asyncio

多进程和 Event Loop 也可能引起一些问题, [这篇文章](http://4fish.xyz/posts/asyncio-concurrency/) 给了一个很好的例子:

假设现在有一个场景,主进程运行着一个event loop,在某个时候会fork出一个子进程,子进程再去运行一个新建的event loop:

“`
async def coro(loop):
pid = os.fork()
if pid != 0: # parent
pass
else: # child
cloop = asyncio.new_event_loop()
cloop.run_forever()

loop = asyncio.get_event_loop()
asyncio.ensure_future(coro(loop), loop=loop)
loop.run_forever()
loop.close()
“`

这段代码看起来没有什么问题, 在子进程中开了一个新的 Event Loop, 然而在 Python 3.5 和以下, 在真正运行时会报错:

“`

cloop.run_forever()
File “/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py”, line 411, in run_forever
‘Cannot run the event loop while another loop is running’)
RuntimeError: Cannot run the event loop while another loop is running
“`

原因就在于标准库的 Event Loop 没有考虑多进程环境, 只是使用一个 thread local 来表示当前的 loop, 在多线程条件下, 这样当然是可以的, 但是在 fork 之后, 数据结构全部都得到了复制, 因此子进程就会检查到已经有 event loop 在运行了.

在 Python 3.6 中, 这个问题得到了简单粗暴的修复, 在每个 loop 上都标记一个 pid, 检查的时候再加上 pid 验证是不是当前进程就好了.

总而言之, 尽量不要同时使用多进程和多线程, 如果非要用的话, 首先尽早创建好需要的进程, 然后在进程中再开始创建线程或者开启 Event Loop.

还有一篇文章没看, 用空了再看下吧, 是讲 multiprocessing.Pool 的坑:

1. https://codewithoutrules.com/2018/09/04/python-multiprocessing/

Python 高性能请求库 aiohttp 的基本用法

aiohttp 是 Python 异步编程最常用的一个 web 请求库了, 依托于 asyncio, 性能非常吓人. 下面列举几个常见的用法:

# 最基础: 并发下载网页

“`
import aiohttp
import asyncio

async def fetch(session, url):
async with session.get(url) as response:
return await response.text()

async def main():
urls = [
‘http://python.org’,
‘https://google.com’,
‘http://yifei.me’
]
tasks = []
async with aiohttp.ClientSession() as session:
for url in urls:
tasks.append(fetch(session, url))
htmls = await asyncio.gather(*tasks)
for html in htmls:
print(html[:100])

if __name__ == ‘__main__’:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
“`