$ ls ~yifei/notes/

curio

Posted on:

Last modified:

Curio 是一个神奇的 Python 库,它完全面向 Python 3.5 增加的 async/await 语法,从低层就没有 使用 callback 的语法,因此相比 asyncio 来说设计更简单,API 更优雅,性能却更好。

Curio 来自于作者 Dabeaz 的一个演讲。其中的关键信息有:

Python 中异步编程的发展历史

Polling -> callback -> Futures/Deferred -> Generators -> Inlined Callbacks
-> Coroutine -> yield from -> asyncio -> async/await

或许我们不需要中间这些步骤,可以直接在 polling 的基础上实现 async/await. async 也不应该 是一个实现、一个库、或者 asyncio,而应该是一个接口。

一个例子

Curio 的文档给了一个很好的例子,下面总结一下这个例子。这个例子模拟了孩子在玩 Minecraft, 而家长在催促孩子该出发了的情景。

import curio

async def countdonw(n):
    while n > 0:
        print('T-minus', n)
        await cuiro.sleep(1)  # 在异步编程中,不能使用同步代码中的 time.sleep
        n -= 1

start_evt = cruio.Event()

async def frined(name):
    print('你好,我叫', name)
    print('开始玩 Minecraft')
    try:
        await curio.sleep(1000)
    except curio.CancelledError:
        print(name, '回家了')
        raise

def fib(n):
    if n <= 2:
        return 1
    else:
        return fib(n-1) + fib(n-2)

async def kid():
    while True:
        try:
            print('可以玩了吗?')
            await curio.timeout_after(1, start_evt.wait)  # 等待一秒
            break
        except curio.TaskTimeout:
            print('啊啊啊')
    print('在 Minecraft 中造建筑')
    async with curio.TaskGroup() as f:
        await f.spawn(friend, 'Max')  # 生成 friend 协程
        await f.spawn(friend, 'Lilikan')
        await f.spawn(friend, 'Thomas')
        try:
            total = 0:
            for i in range(10):
                total += await curio.run_in_process(fib, n)   # 把计算密集型的任务交给另一个进程
                await curio.run_in_thread(time.sleep, n)  # 会阻塞的,放到其他线程
            await curio.sleep(1000)  # 模拟延迟
        except curio.CancelledError:
            print('好的,保存中')  # 如果被取消了,做好善后工作
            raise  # 需要重新跑出异常

async def parent():
    kid_task = await curio.spawn(kid)  # 打开 kid 协程,这里并不会等待 kid 运行完毕
    await curio.sleep(5)

    print('去玩吧')
    await start_evt.set()
    await curio.sleep(5)

    print('该出发了')
    count_task = await curio.spawn(countdown, 10)  # 开始倒计时协程
    await count_task.join()  # 使用 join 等待一个协程完成

    print('真的要走了')
    try:
        await curio.timeout_after(10, kid_task.join)  # 等待 kid 协程运行完毕,最多十秒
    except curio.TaksTimeout:
        print('警告过你了')
        await kid_task.cancel()
    print('出发了')

if __name__ == '__main__':
    curio.run(parent, with_monitor=True)  # 开始执行
    # with_monitor 可以在另一个窗口实时观察有多少协程在运行
    # python3 -m curio.monitor

curio monitor 的使用

打开 curio

$ python -m cuiro.monitor
Curio Monitor: 4 tasks running
Type help for commands
curio>

使用 ps 列出当前正在执行的任务

curio > ps
Task   State        Cycles     Timeout Task
------ ------------ ---------- ------- --------------------------------------------------
1      FUTURE_WAIT  1          None    Monitor.monitor_task
2      READ_WAIT    1          None    Kernel._run_coro.<locals>._kernel_task
3      TASK_JOIN    3          None    parent
4      TIME_SLEEP   1          None    kid
curio >

使用 where 查看每个协程的调用栈

curio > w 3
Stack for Task(id=3, name='parent', <coroutine object parent at 0x1024796d0>, state='TASK_JOIN') (most recent call last):
  File "hello.py", line 23, in parent
    await kid_task.join()
  File "/Users/beazley/Desktop/Projects/curio/curio/task.py", line 106, in join
    await self.wait()
  File "/Users/beazley/Desktop/Projects/curio/curio/task.py", line 117, in wait
    await _scheduler_wait(self.joining, 'TASK_JOIN')
  File "/Users/beazley/Desktop/Projects/curio/curio/traps.py", line 104, in _scheduler_wait
    yield (_trap_sched_wait, sched, state)

curio > w 4
Stack for Task(id=4, name='kid', <coroutine object kid at 0x102479990>, state='TIME_SLEEP') (most recent call last):
  File "hello.py", line 12, in kid
    await curio.sleep(1000)
  File "/Users/beazley/Desktop/Projects/curio/curio/task.py", line 440, in sleep
    return await _sleep(seconds, False)
  File "/Users/beazley/Desktop/Projects/curio/curio/traps.py", line 80, in _sleep
    return (yield (_trap_sleep, clock, absolute))

curio >

使用 cancel 取消一个协程

curio > cancel 4
Cancelling task 4
*** Connection closed by remote host ***

curio 的 API

coroutine 与 kernel

使用async def来创建一个新的 coroutine. 每个 coroutine 不能够单独执行,而是需要通过一个 kernel来执行(相当于 asyncio 中的 loop). 当然一般情况下,我们不会主动去生成一个 kernel, 而是调用 curio.run 来交给 curio 隐式执行。

async def hello(name):
    print('hello', name)

run(hello, 'Guido')    # Preferred
run(hello('Guido'))    # Ok

tasks

前面说到,一个 coroutine 需要交给 curio 来运行,但是实际上 curio 运行的并不是这个 coroutine, 而是包含了这个 coroutine 的 task. task 可以认为是一个线程,而 coroutine 则可以看成是 target 函数。和线程一样,task 也分为了 daemon 的和非 daemon 的。当所有非 daemon 的 task 执行完毕之后, kernel 就会自动退出。这个和线程是类似的,所有的非 daemon 的线程执行完毕之后,整个进程就会退出。 而我们通过 curio.run 创建的那个 task 实际上就相当于是我们在多线程程序中的主线程了。

spawn

await spawn(corofunc, *args, daemon=False)

在多线程编程中,我们通过使用 t = Theaad(target=func); t.start() 来开始执行新的线程。然而, 在 curio 中,你不能通过 t = Task(target=func); t.start() 来创建新的 task. 而应改通过 t = await spawn(corofunc) 来创建并开始执行新的 coroutine.

Task join

可以使用 r = await task.join() 来等待 task 运行结束,并获得返回值。也可以使用 await task.wait() 但是不会返回值,必须之后再使用 task.result 获得返回值

v = await Task.join()  # 返回返回值

# or

await task.wait()
v = task.result  # 如果在 task 结束之前访问,会 raise RuntimeError

Task Group

curio 支持使用 task group 来管理一组任务,class TaskGroup(tasks=(), *, wait=all, name=None).

在创建 task group 的时候就可以把已经生成的 task 放入 group 中,或者随后使用 task_group.spawn/add_task 来向 group 中添加 task.

await TaskGroup.spawn(corofunc, *args, ignore_result=False)
# 生成一个 task, 并放入到该 task group 中

await TaskGroup.add_task(coro)
# 添加已有的 task 到该 task group 中
# 以上两个方法分别添加 corofunc 和 task 到当前 group 中

await TaskGroup.join(*, wait=all)
# 等待所有的 task 运行结束

await TaskGroup.cancel_remaining()
# 取消所有还在运行的 task
用在 with 语句和迭代器中

task group 可以用在 with 语句中,这样在 with 块退出的时候就会隐式地调用 task_group.join().

async with TaskGroup() as g:
    t1 = await g.spawn(func1)
    t2 = await g.spawn(func2)
    t3 = await g.spawn(func3)

# all tasks done here
print('t1 got', t1.result)
print('t2 got', t2.result)
print('t3 got', t3.result)

task group 还可以用作迭代器,其中包含了所有 task.result

async with TaskGroup() as g:
    t1 = await g.spawn(func1)
    t2 = await g.spawn(func2)
    t3 = await g.spawn(func3)
    async for task in g:
        print(task, 'completed.', task.result)

task local storage

class Local 类似于 threading.Local, 但是随着一个新的 context local storage PEP 的到来, 这个功能会被废弃掉。

time

使用 curio.sleep 而不是 time.sleep, 以为整个协程都是单线程的。

workers

如果需要运行一些 CPU 密集的任务或者是一些可能 block 住的任务,可以使用 workers.

有用的函数

await curio.workers.run_in_process(callable, *args)

如果取消对应的 coroutine 的话,相应的进程会收到 SIGTERM 而立即停止执行

await curio.workers.run_in_thread(callable, *args)

如果取消对应 coroutine 的话,相应的线程并不会停止执行,而是进入一种类似 zombie 的状态, 直到运行结束。

await curio.workers.block_in_thread(callable, *args)

类似 run_in_thread, 但是对用同一个 callable, 同时只有一个线程在执行。

curio.workers.MAX_WORKER_THREADS  # 同一个 kernel 能使用的最大的线程数,默认 64
curio.workers.MAX_WORKER_PROCESSES  # 同一个 kernel 能使用的最大进程数,默认 CPU 数量

文件

读取文件可能是个很耗时的工作,不光是读写磁盘,如果你的文件是在一个网络文件系统上,那么 将会更加耗时。如果在协程中发生这种操作,整个协程 kernel 都会被 block 住。

curio.file 提供了一些供异步读取文件的机制。

curio.file.aopen(*args, **kwargs)

需要注意的是,这个函数只能用在 Async Context Manager 中,而不能直接 f = await aopen()

async with aopen(filename) as:
    async for line in f:
        print(line)

原语

curio 提供了 Event, Lock, RLock, Semephore, BoundedSemaphore, Condition

queue

正如标准库提供了 queue 模块用于多线程之间通信一样,curio 提供了 curio.queue 来实现 task 之间的通信。用法和 queue 模块基本上是一样的,除了一些方法变成了 coroutine function, 而 不是普通的函数了。

异步线程

如果你需要执行很多的同步操作,但是还是想要能够和 curio 来交互,可以使用异步线程。在异步 线程内,可以使用AWAIT函数来实现await关键字的操作,可以使用普通的withfor来 实现使用了async with. 也就是实现了不用在 coroutine 内部而使用 coroutine 的操作。

另外值得注意的一点是,如果把定义的 async thread 当做同步版本的线程来运行,那么 AWIAT 就是一个 no-op, 也就是说可以直接把他当做同步线程来用。

WeChat Qr Code

© 2016-2022 Yifei Kong. Powered by ynotes

All contents are under the CC-BY-NC-SA license, if not otherwise specified.

Opinions expressed here are solely my own and do not express the views or opinions of my employer.

友情链接: MySQL 教程站