TIL

Python中的异步功能入门(1)

ref: Getting Started With Async Features in Python

什么是同步

程序按顺序一步一步执行,每完成一步才可以继续执行下一步的过程,即为同步

同步的优劣

程序简单明了,但对于web服务很容易造成请求拥堵,因为需要处理完一个请求才能处理下一个,无法同时处理多个请求。

改善同步的方法1:轮询(Polling)

每隔一段时间,检查当前是否有其他任务需要处理,如果有则先去处理完后再返回原来的任务。

改善同步的方法2:线程(Threading)

克隆程序的多个部分代码,在共享同步一个内存空间的情况下,分别执行不同的任务。
但由于所有线程共享一个内存空间,所以如果一个线程正在使用其中一个资源/变量,则另一个线程就无法使用,必需等待该资源被释放后才可以继续使用。
因此,多线程容易导致数据损坏、在无效状态下读取数据以及数据混乱,大部分情况系统会正确处理这些问题,但有时我们也需要通过锁机制来确保重要的资源的准确性。

什么是阻塞调用

阻止CPU执行其它操作的代码,比如time.sleep函数或者IO操作

什么是异步/非阻塞调用

程序可以在执行某个任务时,不必等待其完成,便可以去执行另一个任务,等任务执行完成后再返回来处理结果。
Python 3 中提供了 asyncio/await 来实现异步。


接下来,我们来看看相关代码实例

同步机制代码:

import queue

def task(name, work_queue):
    if work_queue.empty():
        print(f"Task {name} nothing to do")
    else:
        while not work_queue.empty():
            count = work_queue.get()
            total = 0
            print(f"Task {name} running")
            for x in range(count):
                total += 1
            print(f"Task {name} total: {total}")

def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = queue.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Create some synchronous tasks
    tasks = [(task, "One", work_queue), (task, "Two", work_queue)]

    # Run the tasks
    for t, n, q in tasks:
        t(n, q)

if __name__ == "__main__":
    main()

输出:

Task One running
Task One total: 15
Task One running
Task One total: 10
Task One running
Task One total: 5
Task One running
Task One total: 2
Task Two nothing to do

使用yield实现简单的协同运行

关于yield的定义和用法可以查看这里1
根据之前的代码,加上yield:

import queue

def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        print(f"Task {name} running")
        for x in range(count):
            total += 1   
           # 模拟阻塞调用:time.sleep(count)
            print(f"---Task {name} total: {total}")
            yield
        print(f"Finish Task {name} total: {total}")

def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = queue.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Create some tasks
    tasks = [task("One", work_queue), task("Two", work_queue)]

    # Run the tasks
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

if __name__ == "__main__":
    main()

输出:

Task One running
---Task One total: 1
Task Two running
---Task Two total: 1
---Task One total: 2
---Task Two total: 2
---Task One total: 3
---Task Two total: 3
---Task One total: 4
---Task Two total: 4
---Task One total: 5
---Task Two total: 5
---Task One total: 6
---Task Two total: 6
---Task One total: 7
---Task Two total: 7
---Task One total: 8
---Task Two total: 8
---Task One total: 9
---Task Two total: 9
---Task One total: 10
---Task Two total: 10
---Task One total: 11
Finish Task Two total: 10
Task Two running
---Task Two total: 1
---Task One total: 12
---Task Two total: 2
---Task One total: 13
---Task Two total: 3
---Task One total: 14
---Task Two total: 4
---Task One total: 15
---Task Two total: 5
Finish Task One total: 15
Task One running
---Task One total: 1
Finish Task Two total: 5
---Task One total: 2
Finish Task One total: 2

可以看到两个任务交换执行,类似异步,实际上依旧是同步程序。
当任务是IO密集型时,比如在tas函数中的total += 1下添加time.sleep(count)来模拟IO操作,这时就会产生阻塞,效率就会和普通的同步程序差不多。

使用 asyncio/await 实现异步

Python 异步系统的核心是“事件循环(event loop)”,它运行所有代码,包括main()。
当执行到 await 关键字时,会发生上下文切换,执行控制权会回到”事件循环“,然后”事件循环“会查找已完成的事件,然后把执行控制权传递给已完成的任务。
所以即使异步程序以单线程执行,也可以实现异步的功能,但要手动控制上下文切换时对数据的影响。

import asyncio
from codetiming import Timer

async def task(name, work_queue):
    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
    while not work_queue.empty():
        delay = await work_queue.get()
        print(f"Task {name} running")
        timer.start()
        await asyncio.sleep(delay)
        timer.stop()

async def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = asyncio.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        await work_queue.put(work)

    # Run the tasks
    with Timer(text="\nTotal elapsed time: {:.1f}"):
        await asyncio.gather(
            asyncio.create_task(task("One", work_queue)),
            asyncio.create_task(task("Two", work_queue)),
        )

if __name__ == "__main__":
    asyncio.run(main())

输出:

Task One running
Task Two running
Task Two total elapsed time: 10.0
Task Two running
Task One total elapsed time: 15.0
Task One running
Task Two total elapsed time: 5.0
Task One total elapsed time: 2.0

Total elapsed time: 17.0

可以看到,程序的总执行时间少于各部分的总和,这就是异步功能的作用,可以充分利用CPU,让其可以同时执行多个任务。


  1. yield用于定义生成器函数(generator function),生成器函数与普通函数的区别在于:

    1. 生成器函数包含yield语句,普通函数使用return语句返回结果。
    2. 当生成器函数被调用时,它返回一个生成器对象(generator object),而不是立即执行函数体。
    3. 当第一次调用生成器的next()方法或使用for循环迭代它时,函数开始执行,直到遇到yield语句为止。yield会返回一个值,并暂停函数的执行。
    4. 当再次调用next()或继续for循环时,函数从上次暂停的位置继续执行,直到再次遇到yield。
    5. 当函数执行完毕没有更多的yield语句时,生成器自动抛出StopIteration异常结束迭代。
      比如:
    def countdown(n):
        while n > 0:
            yield n
            n -= 1
        
    for i in countdown(5):
        print(i)
    

    输出:

    5
    4
    3
    2
    1
    

    可以看到,生成器让我们可以用非常简洁的方式生成一系列的值,而不需要构造一个列表存储所有的值。这在处理大量数据时非常有用,因为生成器并不需要在内存中存储所有生成的值。
    总结一下yield的特点和优点:

    1. 使用yield可以实现延迟计算(lazy evaluation),按需生成结果,节省内存。
    2. 生成器函数可以看成是一种特殊的迭代器,它自动实现了__iter__和__next__方法。
    3. 生成器表达式提供了一种更简洁的创建生成器的语法。
    4. Python协程(coroutine)也是使用yield语句来实现的。
    5. 使用yield语句可以非常方便地实现管道(pipeline)式的数据处理。
     ↩︎

在我Debug了2天后,才发现Django的ModelAdmin模块中, list_filter也会调用get_search_results函数来查询数据库
因此如果你设置了list_filter列表,但在admin页面上并没有成功筛选
可能是你自定义了get_search_results函数,对筛选结果进行了额外的操作。

关于Python的 __pycache__ 文件夹小记

ref: https://realpython.com/python-pycache/#what-actions-invalidate-the-cache

  • Python 中的 __pycache__ 文件夹是什么?
    Python模块的缓存文件夹,将需要的模块编译为字节码,并缓存(.pyc)到该文件夹中,实现更快的导入速度。
    除了__pycache__ 文件夹,Python还在内存中创建了模块缓存,缓存需要导入多次的模块,减少导入模块的开销。
  • 如何判断缓存的模块是否过期?
    默认基于时间戳判断,也可以基于哈希值
  • 即使使用了 from … import 语法,Python 还是会读取并编译整个模块,包括未使用的。
  • 可以在python命令后使用-X importtime参数来显示每个模块的导入时间
  • 递归删除所有 __pycache__ 文件夹(linux):find . -type d -name __pycache__ -exec rm -rf {} +
  • 如何禁止Python创建缓存文件?
    向 python 命令传递 -B 选项,或者设置环境变量PYTHONDONTWRITEBYTECODE=1
  • 集中存储缓存
    方法1:python -X pycache_prefix=/tmp/pycache calculator.py
    方法2:设置环境变量PYTHONPYCACHEPREFIX=/tmp/pycache
    它会在指定的文件夹下镜像项目的目录结构,由于这种集中式缓存的层次结构与项目结构相匹配,因此可以在多个项目之间共享该缓存文件夹

原来python的format函数是必需给所有的占位符赋值的,不能只赋值其中一个。
比如下面的代码会报错,因为country没有赋值

my_str = "My name is {name}, I'm from {country}"     
print( my_str.format(name='Versun') )     

所以如果对于用户输入的字符串,比如用于AI的提示词,用replace更合适些。

TIL: Linux查看具体进程的内存占用情况

cat /proc/[PID]/status | grep VmRSS

TIL-什么是RAG

RAG是Retrieval Augmented Generation的缩写,中文名为检索增强生成。

它是一种结合了信息检索和文本生成的技术,旨在增强大语言模型处理知识密集型任务的能力。

RAG的基本工作流程如下

  1. 检索:根据用户的查询,利用检索模型从外部知识库中获取相关的背景信息。通常是将查询向量化,然后在向量数据库中进行相似度搜索,找出最相关的若干条记录。
  2. 增强:将用户查询和检索到的背景信息一起嵌入到预设的提示模板中,生成增强后的提示。
  3. 生成:将增强后的提示输入到语言模型中,生成最终的输出文本。 通过融合外部知识,RAG可以让语言模型生成更加准确、符合上下文的回答,减少幻觉和错误信息。同时RAG也比较灵活,可以通过更新知识库来适应知识的变化,而无需重新训练整个语言模型

与微调(fine-tuning)相比,RAG有以下优势

  1. 更新知识更加高效,只需修改知识库即可,不用重新训练模型
  2. 更适合知识会随时间变化的动态场景
  3. 可以利用更大规模的外部知识,不受模型参数规模的限制