Posted on:
Last modified:
应该尽量避免使用 ProcessPoolExecutor 和 ThreadPoolExecutor,乃至于尽量少使用 ProcessPool 和 ThreadPool。因为使用这两种类型资源的时候,实际上在使用一个隐式的队列。
在并行处理任务的时候,如果任务的数量显著多于并发数量,那么势必是要使用一个隐式队列排队的。 我们知道,explicit is better than implicit,所以尽量不要使用隐式队列。
具体来说,对于隐式的队列和隐式的进程,我们没有办法去控制队列和进程的退出,信号也发不过去, 会引起很多问题。
隐式的队列往往在队列中传递的是复杂对象,复杂对象有可能持有不可序列化的资源,那么在使用 ProcessPoolExectuor 的时候就会遇到无法放进队列的问题。更糟糕的是,如果资源中有锁,在隐式 fork 的时候行为根本无法预测。传递的复杂对象实际上还是一个具有上下文的变量,他可能假设了 对于当前环境的一些全局变量的访问。
总结一下,任务数和并发数量(线程或进程数)的关系:
<=
并发数,开几个线程自己 map 一下就好了,这种情况下不会造成排队>
并发数,最好使用显式的队列直接使用一个显式队列 q = Queue()
. 然后写一个 Thread:
# 具体的处理函数
def _work():
...
# worker 处理函数,就是不断读消息,并处理
def worker():
while True:
if should_stop():
break
_work()
# 开几个 worker thread
for _ in range(5):
t = threading.Thread(target=worker, daemon=True)
t.start()
类似上面说的单一程序内的情况,分布式环境中的也有类似的两种模式:
Executor 本质上是和 Celery 一样的任务队列,也就是说把每个任务绑定了处理函数再发送到隐式 消息队列中,而不是向显式消息队列中发送一个可序列化的简单消息对象。这种技术选择就造成了 队列的状态和控制的不可捉摸性。
如前文所述,隐式的函数绑定往往会隐藏以下问题:
除此之外,使用隐式队列也隐藏了一些问题:
而如果我们在消费者中显式地按照消息类别去查找处理函数,以上问题都不会存在。当我们显式定义 消息的时候,也不会存在持有资源的情况。
使用 executor pattern 实际上是一种逃避思考的 hack 方式,这种方式动态性太强,缺乏类型规范, 往往会在运行时出线上 bug. 显式的思考有助于在编码阶段就消除以上问题。
Prefer simple messages over bound functions.
哪怕是我们使用 redis list 来做一个简单的显式队列,如果思考过如上问题之后,稳定性和可预期 性也会比使用 celery 这些所谓的 task queue 要好得多。
如上的两种方式实际上都是着眼于处理过程,如果我们把思维方式的焦点放在数据上,那么可以考虑 map reduce 模式。
上面的思路中,多多少少都是面向行的,也就是说一个一个消息处理。而 MapReduce 的思路是面向列 的向量化处理方式,某种程度上不需要队列,也就没有上面的问题。
© 2016-2022 Yifei Kong. Powered by ynotes
All contents are under the CC-BY-NC-SA license, if not otherwise specified.
Opinions expressed here are solely my own and do not express the views or opinions of my employer.
友情链接: MySQL 教程站