Python中的异步功能入门(1)

ref: Getting Started With Async Features in Python

什么是同步

程序按顺序一步一步执行,每完成一步才可以继续执行下一步的过程,即为同步

同步的优劣

程序简单明了,但对于web服务很容易造成请求拥堵,因为需要处理完一个请求才能处理下一个,无法同时处理多个请求。

改善同步的方法1:轮询(Polling)

每隔一段时间,检查当前是否有其他任务需要处理,如果有则先去处理完后再返回原来的任务。

改善同步的方法2:线程(Threading)

克隆程序的多个部分代码,在共享同步一个内存空间的情况下,分别执行不同的任务。
但由于所有线程共享一个内存空间,所以如果一个线程正在使用其中一个资源/变量,则另一个线程就无法使用,必需等待该资源被释放后才可以继续使用。
因此,多线程容易导致数据损坏、在无效状态下读取数据以及数据混乱,大部分情况系统会正确处理这些问题,但有时我们也需要通过锁机制来确保重要的资源的准确性。

什么是阻塞调用

阻止CPU执行其它操作的代码,比如time.sleep函数或者IO操作

什么是异步/非阻塞调用

程序可以在执行某个任务时,不必等待其完成,便可以去执行另一个任务,等任务执行完成后再返回来处理结果。
Python 3 中提供了 asyncio/await 来实现异步。


接下来,我们来看看相关代码实例

同步机制代码:

import queue

def task(name, work_queue):
    if work_queue.empty():
        print(f"Task {name} nothing to do")
    else:
        while not work_queue.empty():
            count = work_queue.get()
            total = 0
            print(f"Task {name} running")
            for x in range(count):
                total += 1
            print(f"Task {name} total: {total}")

def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = queue.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Create some synchronous tasks
    tasks = [(task, "One", work_queue), (task, "Two", work_queue)]

    # Run the tasks
    for t, n, q in tasks:
        t(n, q)

if __name__ == "__main__":
    main()

输出:

Task One running
Task One total: 15
Task One running
Task One total: 10
Task One running
Task One total: 5
Task One running
Task One total: 2
Task Two nothing to do

使用yield实现简单的协同运行

关于yield的定义和用法可以查看这里1
根据之前的代码,加上yield:

import queue

def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        print(f"Task {name} running")
        for x in range(count):
            total += 1   
           # 模拟阻塞调用:time.sleep(count)
            print(f"---Task {name} total: {total}")
            yield
        print(f"Finish Task {name} total: {total}")

def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = queue.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Create some tasks
    tasks = [task("One", work_queue), task("Two", work_queue)]

    # Run the tasks
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

if __name__ == "__main__":
    main()

输出:

Task One running
---Task One total: 1
Task Two running
---Task Two total: 1
---Task One total: 2
---Task Two total: 2
---Task One total: 3
---Task Two total: 3
---Task One total: 4
---Task Two total: 4
---Task One total: 5
---Task Two total: 5
---Task One total: 6
---Task Two total: 6
---Task One total: 7
---Task Two total: 7
---Task One total: 8
---Task Two total: 8
---Task One total: 9
---Task Two total: 9
---Task One total: 10
---Task Two total: 10
---Task One total: 11
Finish Task Two total: 10
Task Two running
---Task Two total: 1
---Task One total: 12
---Task Two total: 2
---Task One total: 13
---Task Two total: 3
---Task One total: 14
---Task Two total: 4
---Task One total: 15
---Task Two total: 5
Finish Task One total: 15
Task One running
---Task One total: 1
Finish Task Two total: 5
---Task One total: 2
Finish Task One total: 2

可以看到两个任务交换执行,类似异步,实际上依旧是同步程序。
当任务是IO密集型时,比如在tas函数中的total += 1下添加time.sleep(count)来模拟IO操作,这时就会产生阻塞,效率就会和普通的同步程序差不多。

使用 asyncio/await 实现异步

Python 异步系统的核心是“事件循环(event loop)”,它运行所有代码,包括main()。
当执行到 await 关键字时,会发生上下文切换,执行控制权会回到”事件循环“,然后”事件循环“会查找已完成的事件,然后把执行控制权传递给已完成的任务。
所以即使异步程序以单线程执行,也可以实现异步的功能,但要手动控制上下文切换时对数据的影响。

import asyncio
from codetiming import Timer

async def task(name, work_queue):
    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
    while not work_queue.empty():
        delay = await work_queue.get()
        print(f"Task {name} running")
        timer.start()
        await asyncio.sleep(delay)
        timer.stop()

async def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = asyncio.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        await work_queue.put(work)

    # Run the tasks
    with Timer(text="\nTotal elapsed time: {:.1f}"):
        await asyncio.gather(
            asyncio.create_task(task("One", work_queue)),
            asyncio.create_task(task("Two", work_queue)),
        )

if __name__ == "__main__":
    asyncio.run(main())

输出:

Task One running
Task Two running
Task Two total elapsed time: 10.0
Task Two running
Task One total elapsed time: 15.0
Task One running
Task Two total elapsed time: 5.0
Task One total elapsed time: 2.0

Total elapsed time: 17.0

可以看到,程序的总执行时间少于各部分的总和,这就是异步功能的作用,可以充分利用CPU,让其可以同时执行多个任务。


  1. yield用于定义生成器函数(generator function),生成器函数与普通函数的区别在于:

    1. 生成器函数包含yield语句,普通函数使用return语句返回结果。
    2. 当生成器函数被调用时,它返回一个生成器对象(generator object),而不是立即执行函数体。
    3. 当第一次调用生成器的next()方法或使用for循环迭代它时,函数开始执行,直到遇到yield语句为止。yield会返回一个值,并暂停函数的执行。
    4. 当再次调用next()或继续for循环时,函数从上次暂停的位置继续执行,直到再次遇到yield。
    5. 当函数执行完毕没有更多的yield语句时,生成器自动抛出StopIteration异常结束迭代。
      比如:
    def countdown(n):
        while n > 0:
            yield n
            n -= 1
        
    for i in countdown(5):
        print(i)
    

    输出:

    5
    4
    3
    2
    1
    

    可以看到,生成器让我们可以用非常简洁的方式生成一系列的值,而不需要构造一个列表存储所有的值。这在处理大量数据时非常有用,因为生成器并不需要在内存中存储所有生成的值。
    总结一下yield的特点和优点:

    1. 使用yield可以实现延迟计算(lazy evaluation),按需生成结果,节省内存。
    2. 生成器函数可以看成是一种特殊的迭代器,它自动实现了__iter__和__next__方法。
    3. 生成器表达式提供了一种更简洁的创建生成器的语法。
    4. Python协程(coroutine)也是使用yield语句来实现的。
    5. 使用yield语句可以非常方便地实现管道(pipeline)式的数据处理。
     ↩︎

Contents