Posted on:
Last modified:
应该尽量避免使用 ProcessPoolExecutor 和 ThreadPoolExecutor,乃至于尽量少使用 ProcessPool 和 ThreadPool。因为使用这两种类型资源的时候,实际上在使用一个隐式的队列。
在并行处理任务的时候,如果任务的数量显著多于并发数量,那么势必是要使用一个隐式队列排队的。 我们知道,explicit is better than implicit,所以尽量不要使用隐式队列。
具体来说,对于隐式的队列和隐式的进程,我们没有办法去控制队列和进程的退出,信号也发不过去, 会引起很多问题。
隐式的队列往往在队列中传递的是复杂对象,复杂对象有可能持有不可序列化的资源,那么在使用 ProcessPoolExectuor 的时候就会遇到无法放进队列的问题。更糟糕的是,如果资源中有锁,在隐式 fork 的时候行为根本无法预测。传递的复杂对象实际上还是一个具有上下文的变量,他可能假设了 对于当前环境的一些全局变量的访问。
总结一下,任务数和并发数量(线程或进程数)的关系:
<=
并发数,开几个线程自己 map 一下就好了,这种情况下不会造成排队>
并发数,最好使用显式的队列直接使用一个显式队列 q = Queue()
. 然后写一个 Thread:
# 具体的处理函数
def _work():
...
# worker 处理函数,就是不断读消息,并处理
def worker():
while True:
if should_stop():
break
_work()
# 开几个 worker thread
for _ in range(5):
t = threading.Thread(target=worker, daemon=True)
t.start()
类似上面说的单一程序内的情况,分布式环境中的也有类似的两种模式:
Executor 本质上是和 Celery 一样的任务队列,也就是说把每个任务绑定了处理函数再发送到隐式消息队列中, 而不是向显式消息队列中发送一个可序列化的简单消息对象。这种技术选择就造成了队列的状态和控制的不可捉摸性。
如前文所述,隐式的函数绑定往往会隐藏以下问题:
除此之外,使用隐式队列也隐藏了一些问题:
而如果我们在消费者中显式地按照消息类别去查找处理函数,以上问题都不会存在。当我们显式定义消息的时候,也不会存在持有资源的情况。
使用 executor pattern 实际上是一种逃避思考的 hack 方式,这种方式动态性太强,缺乏类型规范,往往会在运行时出线上 bug. 显式的思考有助于在编码阶段就消除以上问题。
Perfer simple messages over bound founctions.
哪怕是我们使用 redis list 来做一个简单的显式队列,如果思考过如上问题之后,稳定性和可预期性也会比使用 celery 这些所谓的 task queue 要好得多。
如上的两种方式实际上都是着眼于处理过程,如果我们把思维方式的焦点放在数据上,那么可以考虑 map reduce 模式。
上面的思路中,多多少少都是面向行的,也就是说一个一个消息处理。而 MapReduce 的思路是面向列的向量化处理方式,某种程度上不需要队列,也就没有上面的问题。
© 2016-2022 Yifei Kong. Powered by ynotes
All contents are under the CC-BY-NC-SA license, if not otherwise specified.
Opinions expressed here are solely my own and do not express the views or opinions of my employer.