Posted on:
Last modified:
最近要写一个库往 influxdb
中打点,因为要被很多程序使用,而又要创建新的进程,为了避免
引起使用方的异常,简单深入了解了下 Python 的并发控制,这才发现标准库真是坑。之前没多考虑过,
只是凭感觉在 CPU 密集的时候使用 multiprocessing, 而默认使用 threading, 其实两个还是有很多
不一样的,除了都是并发执行以外还有很大的不同。Python 中试图用 threading 和 multiprocessing
实现类似的接口来统一两方面,结果导致更混乱了。本文探讨几个坑。
首先不谈 Python, 我们思考一下,在多线程环境下如果执行 fork 会怎样?在新的进程中,会不会 所有线程都在运行?答案是否定的,在 fork 之后,只有执行 fork 的线程在运行, 而其他线程 都不会运行。这是 POSIX 标准规定的:
A process shall be created with a single thread. If a multi-threaded process calls fork(), the new process shall contain a replica of the calling thread and its entire address space, possibly including the states of mutexes and other resources. Consequently, to avoid errors, the child process may only execute async-signal-safe operations until such time as one of the exec functions is called. Fork handlers may be established by means of the
pthread_atfork()
function in order to maintain application invariants across fork() calls.
也就是说。fork 的时候,整个进程都会被复制,但是子进程在执行 exec
之前不能做很多操作。
其他线程持有的锁并不会自动转化到当前线程,所以可能造成死锁。关于在多线程程序中执行 fork 会造成的问题,有好多文章有详细的讨论:
在 Python 中可以把线程设置为 daemon 状态,如果一个进程中只有 daemon thread, 这个进程就会 自动退出。那么问题来了,如果我们 daemon thread 中执行 fork 会怎样呢?
直觉上来说,既然 fork 之后只有一个线程,而这个线程又是 daemon 线程,那么显然这个进程应该 直接退出的,然而并不会这样,这个进程会一直运行,直到该线程退出。这是因为 fork 之后,唯一的 线程自动成为了 main thread, 而 Python 中硬编码了 main thread 不是 daemon thread, 所以这个 线程不会退出。
在普通进程中,进程在所有非 daemon 的线程退出之后才会退出,但是在新创建的进程中,不论创建的 线程是 daemon thread 或者不是 daemon thread 都会在主线程退出后退出。这是 Python 的一个 bug, 这个 bug 最早在 2013-09-08 报告出来,而直到 2017-08-16 的 Python 3.7 才修复...
如何复现这个 bug:
from multiprocessing import Process
from threading import Thread
from time import sleep
def proc():
mythread = Thread(target=run)
mythread.start()
print("Thread.daemon = %s" % mythread.daemon)
sleep(4)
# mythread.join()
def run():
for i in range(10):
sleep(1)
print("Tick: %s" % i)
if __name__ == "__main__":
p = Process(target=proc)
p.start()
print("Process.daemon = %s" % p.daemon)
这个 bug 的原因是 fork 出来的子进程直接调用了 os._exit 退出,而没有调用 sys.exit,而 thread.join 是在 sys.exit 中调用的。具体来说:
sys.exit()
退出,在这个函数中会调用 thread.join()
来等待子线程结束os._exit()
退出,也就是不会调用 thread.join()
. 所以也就不会等待其他线程退出spawn
系统调用的支持,可以通过 multiprocessing.set_start_method
来设定创建进程使用的系统调用。而使用 spawn
调用创建的进程会通过 sys.exit()
退出,
也就避免了这个 bug 的影响。而使用 fork
创建的进程依然受到这个 bug 的影响。thread._shutdown
的调用,也就是会 join 其他的 thread.我们知道,在 *nix
系统中创建一个一个新的进程可以使用系统调用 fork
, 父进程的所有资源
都会以 Copy On Write 的形式复制到子进程中。如果要执行一个新的程序,必须在 fork
之后调用
exec*
家族的系统调用,后来 Linux 中添加了 spawn
系统调用,spawn
和 fork
的不同是,
spawn 从头创建了一个新的子程序,而不是像 fork
一样复制了一份父进程。
而在 Windows 上,没有类似 fork
的系统调用,只有类似 spawn
的系统调用。
对于 Python 来说,在 *nix
操作系统上,当使用 multiprocessing 的时候,默认调用的是 fork,
新的进程中所有导入的包都已经在了,不需要再 import 一次。而在 Windows 系统上,使用 multiprocessing
创建新的进程时,所有包都需要在新进程中重新 import 一遍,如果 import 操作对外部系统不是
幂等的,有副作用,就会造成平台差异。
当然,如上文所述,在 Python 3.4 之后可以选择创建进程时使用的系统调用,如果统一选择了 spawn
,
那么在各个平台上行为就是统一的了。
多进程和 Event Loop 也可能引起一些问题,这篇文章 给了一个很好的例子:
假设现在有一个场景,主进程运行着一个 event loop,在某个时候会 fork 出一个子进程,子进程 再去运行一个新建的 event loop:
async def coro(loop):
pid = os.fork()
if pid != 0: # parent
pass
else: # child
cloop = asyncio.new_event_loop()
cloop.run_forever()
loop = asyncio.get_event_loop()
asyncio.ensure_future(coro(loop), loop=loop)
loop.run_forever()
loop.close()
这段代码看起来没有什么问题,在子进程中开了一个新的 Event Loop, 然而在 Python 3.5 和以下, 在真正运行时会报错:
...
cloop.run_forever()
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 411, in run_forever
"Cannot run the event loop while another loop is running")
RuntimeError: Cannot run the event loop while another loop is running
原因就在于标准库的 Event Loop 没有考虑多进程环境,只是使用一个 thread local 来表示当前的 loop, 在多线程条件下,这样当然是可以的,但是在 fork 之后,数据结构全部都得到了复制,因此 子进程就会检查到已经有 event loop 在运行了。
在 Python 3.6 中,这个问题得到了简单粗暴的修复,在每个 loop 上都标记一个 pid, 检查的时候 再加上 pid 验证是不是当前进程就好了。
总而言之,尽量不要同时使用多进程和多线程,如果非要用的话,首先尽早创建好需要的进程,然后 在进程中再开始创建线程或者开启 Event Loop.
© 2016-2022 Yifei Kong. Powered by ynotes
All contents are under the CC-BY-NC-SA license, if not otherwise specified.
Opinions expressed here are solely my own and do not express the views or opinions of my employer.
友情链接: MySQL 教程站