ref:
Getting Started With Async Features in Python
什么是同步
程序按顺序一步一步执行,每完成一步才可以继续执行下一步的过程,即为同步
同步的优劣
程序简单明了,但对于web服务很容易造成请求拥堵,因为需要处理完一个请求才能处理下一个,无法同时处理多个请求。
改善同步的方法1:轮询(Polling)
每隔一段时间,检查当前是否有其他任务需要处理,如果有则先去处理完后再返回原来的任务。
改善同步的方法2:线程(Threading)
克隆程序的多个部分代码,在共享同步一个内存空间的情况下,分别执行不同的任务。
但由于所有线程共享一个内存空间,所以如果一个线程正在使用其中一个资源/变量,则另一个线程就无法使用,必需等待该资源被释放后才可以继续使用。
因此,多线程容易导致数据损坏、在无效状态下读取数据以及数据混乱,大部分情况系统会正确处理这些问题,但有时我们也需要通过锁机制来确保重要的资源的准确性。
什么是阻塞调用
阻止CPU执行其它操作的代码,比如time.sleep函数或者IO操作
什么是异步/非阻塞调用
程序可以在执行某个任务时,不必等待其完成,便可以去执行另一个任务,等任务执行完成后再返回来处理结果。
Python 3 中提供了 asyncio/await 来实现异步。
接下来,我们来看看相关代码实例
同步机制代码:
import queue
def task(name, work_queue):
if work_queue.empty():
print(f"Task {name} nothing to do")
else:
while not work_queue.empty():
count = work_queue.get()
total = 0
print(f"Task {name} running")
for x in range(count):
total += 1
print(f"Task {name} total: {total}")
def main():
"""
This is the main entry point for the program
"""
# Create the queue of work
work_queue = queue.Queue()
# Put some work in the queue
for work in [15, 10, 5, 2]:
work_queue.put(work)
# Create some synchronous tasks
tasks = [(task, "One", work_queue), (task, "Two", work_queue)]
# Run the tasks
for t, n, q in tasks:
t(n, q)
if __name__ == "__main__":
main()
输出:
Task One running
Task One total: 15
Task One running
Task One total: 10
Task One running
Task One total: 5
Task One running
Task One total: 2
Task Two nothing to do
使用yield实现简单的协同运行
关于yield的定义和用法可以查看这里[^1]
根据之前的代码,加上yield:
import queue
def task(name, queue):
while not queue.empty():
count = queue.get()
total = 0
print(f"Task {name} running")
for x in range(count):
total += 1
# 模拟阻塞调用:time.sleep(count)
print(f"---Task {name} total: {total}")
yield
print(f"Finish Task {name} total: {total}")
def main():
"""
This is the main entry point for the program
"""
# Create the queue of work
work_queue = queue.Queue()
# Put some work in the queue
for work in [15, 10, 5, 2]:
work_queue.put(work)
# Create some tasks
tasks = [task("One", work_queue), task("Two", work_queue)]
# Run the tasks
done = False
while not done:
for t in tasks:
try:
next(t)
except StopIteration:
tasks.remove(t)
if len(tasks) == 0:
done = True
if __name__ == "__main__":
main()
输出:
Task One running
---Task One total: 1
Task Two running
---Task Two total: 1
---Task One total: 2
---Task Two total: 2
---Task One total: 3
---Task Two total: 3
---Task One total: 4
---Task Two total: 4
---Task One total: 5
---Task Two total: 5
---Task One total: 6
---Task Two total: 6
---Task One total: 7
---Task Two total: 7
---Task One total: 8
---Task Two total: 8
---Task One total: 9
---Task Two total: 9
---Task One total: 10
---Task Two total: 10
---Task One total: 11
Finish Task Two total: 10
Task Two running
---Task Two total: 1
---Task One total: 12
---Task Two total: 2
---Task One total: 13
---Task Two total: 3
---Task One total: 14
---Task Two total: 4
---Task One total: 15
---Task Two total: 5
Finish Task One total: 15
Task One running
---Task One total: 1
Finish Task Two total: 5
---Task One total: 2
Finish Task One total: 2
可以看到两个任务交换执行,类似异步,实际上依旧是同步程序。
当任务是IO密集型时,比如在tas函数中的total += 1
下添加time.sleep(count)
来模拟IO操作,这时就会产生阻塞,效率就会和普通的同步程序差不多。
使用 asyncio/await 实现异步
Python 异步系统的核心是“事件循环(event loop)”,它运行所有代码,包括main()。
当执行到 await 关键字时,会发生上下文切换,执行控制权会回到”事件循环“,然后”事件循环“会查找已完成的事件,然后把执行控制权传递给已完成的任务。
所以即使异步程序以单线程执行,也可以实现异步的功能,但要手动控制上下文切换时对数据的影响。
import asyncio
from codetiming import Timer
async def task(name, work_queue):
timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
while not work_queue.empty():
delay = await work_queue.get()
print(f"Task {name} running")
timer.start()
await asyncio.sleep(delay)
timer.stop()
async def main():
"""
This is the main entry point for the program
"""
# Create the queue of work
work_queue = asyncio.Queue()
# Put some work in the queue
for work in [15, 10, 5, 2]:
await work_queue.put(work)
# Run the tasks
with Timer(text="\nTotal elapsed time: {:.1f}"):
await asyncio.gather(
asyncio.create_task(task("One", work_queue)),
asyncio.create_task(task("Two", work_queue)),
)
if __name__ == "__main__":
asyncio.run(main())
输出:
Task One running
Task Two running
Task Two total elapsed time: 10.0
Task Two running
Task One total elapsed time: 15.0
Task One running
Task Two total elapsed time: 5.0
Task One total elapsed time: 2.0
Total elapsed time: 17.0
可以看到,程序的总执行时间少于各部分的总和,这就是异步功能的作用,可以充分利用CPU,让其可以同时执行多个任务。
[^1]:
yield用于定义生成器函数(generator function),生成器函数与普通函数的区别在于:
- 生成器函数包含yield语句,普通函数使用return语句返回结果。
- 当生成器函数被调用时,它返回一个生成器对象(generator object),而不是立即执行函数体。
- 当第一次调用生成器的next()方法或使用for循环迭代它时,函数开始执行,直到遇到yield语句为止。yield会返回一个值,并暂停函数的执行。
- 当再次调用next()或继续for循环时,函数从上次暂停的位置继续执行,直到再次遇到yield。
-
当函数执行完毕没有更多的yield语句时,生成器自动抛出StopIteration异常结束迭代。
比如:def countdown(n): while n > 0: yield n n -= 1 for i in countdown(5): print(i)
输出:
5 4 3 2 1
可以看到,生成器让我们可以用非常简洁的方式生成一系列的值,而不需要构造一个列表存储所有的值。这在处理大量数据时非常有用,因为生成器并不需要在内存中存储所有生成的值。
总结一下yield的特点和优点: - 使用yield可以实现延迟计算(lazy evaluation),按需生成结果,节省内存。
- 生成器函数可以看成是一种特殊的迭代器,它自动实现了iter和next方法。
- 生成器表达式提供了一种更简洁的创建生成器的语法。
- Python协程(coroutine)也是使用yield语句来实现的。
- 使用yield语句可以非常方便地实现管道(pipeline)式的数据处理。
发表回复