Versun

对待生命,不妨大胆一点,因为我们终将失去它



Python中的异步功能入门(1)

2024-05-23

ref:
Getting Started With Async Features in Python

什么是同步

程序按顺序一步一步执行,每完成一步才可以继续执行下一步的过程,即为同步

同步的优劣

程序简单明了,但对于web服务很容易造成请求拥堵,因为需要处理完一个请求才能处理下一个,无法同时处理多个请求。

改善同步的方法1:轮询(Polling)

每隔一段时间,检查当前是否有其他任务需要处理,如果有则先去处理完后再返回原来的任务。

改善同步的方法2:线程(Threading)

克隆程序的多个部分代码,在共享同步一个内存空间的情况下,分别执行不同的任务。
但由于所有线程共享一个内存空间,所以如果一个线程正在使用其中一个资源/变量,则另一个线程就无法使用,必需等待该资源被释放后才可以继续使用。
因此,多线程容易导致数据损坏、在无效状态下读取数据以及数据混乱,大部分情况系统会正确处理这些问题,但有时我们也需要通过锁机制来确保重要的资源的准确性。

什么是阻塞调用

阻止CPU执行其它操作的代码,比如time.sleep函数或者IO操作

什么是异步/非阻塞调用

程序可以在执行某个任务时,不必等待其完成,便可以去执行另一个任务,等任务执行完成后再返回来处理结果。
Python 3 中提供了 asyncio/await 来实现异步。


接下来,我们来看看相关代码实例

同步机制代码:

import queue

def task(name, work_queue):
    if work_queue.empty():
        print(f"Task {name} nothing to do")
    else:
        while not work_queue.empty():
            count = work_queue.get()
            total = 0
            print(f"Task {name} running")
            for x in range(count):
                total += 1
            print(f"Task {name} total: {total}")

def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = queue.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Create some synchronous tasks
    tasks = [(task, "One", work_queue), (task, "Two", work_queue)]

    # Run the tasks
    for t, n, q in tasks:
        t(n, q)

if __name__ == "__main__":
    main()

输出:

Task One running
Task One total: 15
Task One running
Task One total: 10
Task One running
Task One total: 5
Task One running
Task One total: 2
Task Two nothing to do

使用yield实现简单的协同运行

关于yield的定义和用法可以查看这里[^1]
根据之前的代码,加上yield:

import queue

def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        print(f"Task {name} running")
        for x in range(count):
            total += 1   
           # 模拟阻塞调用:time.sleep(count)
            print(f"---Task {name} total: {total}")
            yield
        print(f"Finish Task {name} total: {total}")

def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = queue.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Create some tasks
    tasks = [task("One", work_queue), task("Two", work_queue)]

    # Run the tasks
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

if __name__ == "__main__":
    main()

输出:

Task One running
---Task One total: 1
Task Two running
---Task Two total: 1
---Task One total: 2
---Task Two total: 2
---Task One total: 3
---Task Two total: 3
---Task One total: 4
---Task Two total: 4
---Task One total: 5
---Task Two total: 5
---Task One total: 6
---Task Two total: 6
---Task One total: 7
---Task Two total: 7
---Task One total: 8
---Task Two total: 8
---Task One total: 9
---Task Two total: 9
---Task One total: 10
---Task Two total: 10
---Task One total: 11
Finish Task Two total: 10
Task Two running
---Task Two total: 1
---Task One total: 12
---Task Two total: 2
---Task One total: 13
---Task Two total: 3
---Task One total: 14
---Task Two total: 4
---Task One total: 15
---Task Two total: 5
Finish Task One total: 15
Task One running
---Task One total: 1
Finish Task Two total: 5
---Task One total: 2
Finish Task One total: 2

可以看到两个任务交换执行,类似异步,实际上依旧是同步程序。
当任务是IO密集型时,比如在tas函数中的total += 1下添加time.sleep(count)来模拟IO操作,这时就会产生阻塞,效率就会和普通的同步程序差不多。

使用 asyncio/await 实现异步

Python 异步系统的核心是“事件循环(event loop)”,它运行所有代码,包括main()。
当执行到 await 关键字时,会发生上下文切换,执行控制权会回到”事件循环“,然后”事件循环“会查找已完成的事件,然后把执行控制权传递给已完成的任务。
所以即使异步程序以单线程执行,也可以实现异步的功能,但要手动控制上下文切换时对数据的影响。

import asyncio
from codetiming import Timer

async def task(name, work_queue):
    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
    while not work_queue.empty():
        delay = await work_queue.get()
        print(f"Task {name} running")
        timer.start()
        await asyncio.sleep(delay)
        timer.stop()

async def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = asyncio.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        await work_queue.put(work)

    # Run the tasks
    with Timer(text="\nTotal elapsed time: {:.1f}"):
        await asyncio.gather(
            asyncio.create_task(task("One", work_queue)),
            asyncio.create_task(task("Two", work_queue)),
        )

if __name__ == "__main__":
    asyncio.run(main())

输出:

Task One running
Task Two running
Task Two total elapsed time: 10.0
Task Two running
Task One total elapsed time: 15.0
Task One running
Task Two total elapsed time: 5.0
Task One total elapsed time: 2.0

Total elapsed time: 17.0

可以看到,程序的总执行时间少于各部分的总和,这就是异步功能的作用,可以充分利用CPU,让其可以同时执行多个任务。

[^1]:
yield用于定义生成器函数(generator function),生成器函数与普通函数的区别在于:

  1. 生成器函数包含yield语句,普通函数使用return语句返回结果。
  2. 当生成器函数被调用时,它返回一个生成器对象(generator object),而不是立即执行函数体。
  3. 当第一次调用生成器的next()方法或使用for循环迭代它时,函数开始执行,直到遇到yield语句为止。yield会返回一个值,并暂停函数的执行。
  4. 当再次调用next()或继续for循环时,函数从上次暂停的位置继续执行,直到再次遇到yield。
  5. 当函数执行完毕没有更多的yield语句时,生成器自动抛出StopIteration异常结束迭代。
    比如:

    def countdown(n):
        while n > 0:
            yield n
            n -= 1
    
    for i in countdown(5):
        print(i)

    输出:

    5
    4
    3
    2
    1

    可以看到,生成器让我们可以用非常简洁的方式生成一系列的值,而不需要构造一个列表存储所有的值。这在处理大量数据时非常有用,因为生成器并不需要在内存中存储所有生成的值。
    总结一下yield的特点和优点:

  6. 使用yield可以实现延迟计算(lazy evaluation),按需生成结果,节省内存。
  7. 生成器函数可以看成是一种特殊的迭代器,它自动实现了iternext方法。
  8. 生成器表达式提供了一种更简洁的创建生成器的语法。
  9. Python协程(coroutine)也是使用yield语句来实现的。
  10. 使用yield语句可以非常方便地实现管道(pipeline)式的数据处理。

2024-05-23 00:58:21 UTC

2024-05-23

Gitpod的云端 vscode 优化的有点厉害啊,同样的项目、同样的插件,内存占用比Github的Codespaces低很多,能多开好几个docker了!!

2024-05-23-00-58-21-utc-4aeb3f21.png 25.1 KB


2024-05-20 00:13:34 UTC

2024-05-20

刚看完《A.I.创世者 / The Creator》,比预想中的要好看,那小孩(Alphie / Madeleine Yuna Voyles)的演技炸裂,特别是最后一幕,深刻演示了“喜极而泣"。
准备去看看1个小时的幕后花絮《True Love: Making ‘The Creator’》

2024-05-20-00-13-34-utc-d966f697.jpeg 1.28 MB


主流LLM模型翻译测试

2024-05-19

目的:为了给RSS翻译器选一个可替代gpt-3.5-turbo的便宜模型做为主力翻译引擎。

判定规则:能翻译出下方3句话,质量能看懂即可,且仅输出翻译结果

提示词:

System Message: Translate only the text into Chinese, return only the translations, do not explain the original text.   
User Messages:   
Don't ask if a monorepo is good for you – ask if you're good enough for a monorepo    
Want more ways to post? for iOS, macOS, Android, and the web.   
Does Offering ChatGPT a Tip Cause it to Generate Better Text? An Analysis  

参数:

temperature=0.2,   
top_p=0.2,   
max_tokens = 1000     

注意:
测试均使用API进行,非playground环境。
仅供本人参考使用,非常不严谨,勿喷,谢谢。
不定时更新

在线可筛选版


Django数据迁移历险记

2024-05-18

django-5e5c7dbb.png 595 Bytes
不到万不得已,不建议手写migration文件,大佬除外。

事情的起因:

在RSS翻译器的缓存模型中,有一个做为hash的主键,当初脑抽使用了BinaryField类型,导致数据库的检索速度一直不理想。
所以这次准备换成检索速度最快的Integer类型。
正如所料,事情没那么简单,这事得从hash的计算方式说起。

过程:

为了能在低配置的机器上尽可能快的运行,我使用了谷歌开发的CityHash64来计算hash,理由很简单,它的代码逻辑比SHA的要简单的多,对于长文的计算要快的多,又能保证唯一性,所以非常理想。

接着在将BinaryField改为BigIntegerField,并在migrate后,测试存储数据时,程序报错了:OverflowError: Python int too large to convert to SQLite INTEGER,一番GPT后才知道,SQLite只能存储64位的有符号整数,而CityHash64的值是无符号64位,超出了范围。

于是,为了方便后期维护和扩展,决定使用char类型来存储值,虽然比Integer的检索慢,但至少比Binary快呀。
既然使用了char类型,就没有大小限制,果断升级到CityHash128来计算hash,并限制39个字符数,相关代码在此

接下来就是重头戏:数据迁移
首先需要重新计算所有的hash值,所以简单的类型migrate只是做了第一步,还需要在migration文件中手动更新数据。
于是我更改了migration文件:

from django.db import migrations, models
import cityhash

def update_hash(apps, schema_editor):
    MyModel = apps.get_model('translator', 'translated_content')
    for obj in MyModel.objects.all():
        obj.hash = cityhash.CityHash128(f"{obj.original_content}{obj.translated_language}")
        obj.save()

class Migration(migrations.Migration):
    dependencies = [
        ('translator', '0030_alter_azureaitranslator_content_translate_prompt_and_more'),
    ]

    operations = [
        migrations.AlterField(
            model_name='translated_content',
            name='hash',
            field=models.CharField(editable=False, max_length=39, primary_key=True, serialize=False),
        ),
        migrations.RunPython(update_hash),
    ]

加了update_hash函数和RunPython,不要问为啥不用bulk_update,因为它无法用于主键类型字段的更新。
接着migrate没有报错,添加数据也正常,在我以为大功告成时,我检查了表里的数据,果然我还是太乐观了,表里原有的数据没有被更新,而是重新创建了新的一条数据。

又是一番GPT,了解到Django为了安全,任何migrate都不能修改原有的数据,但我不死心,改了update_hash函数:

def update_hash(apps, schema_editor):
    MyModel = apps.get_model('translator', 'translated_content')
    for obj in MyModel.objects.all():
        new_hash = cityhash.CityHash128(f"{obj.original_content}{obj.translated_language}")
        MyModel.objects.filter(hash=obj.hash).update(hash=new_hash)

这次没有意外,依旧无法修改,filter无法正常筛选出数据,一直为空。

只能另辟蹊径,创建多个migration来完成这次的迁移:

  1. 保留原有的hash字段(BinaryField),创建新字段hash_new(CharField)。(代码
  2. 计算hash_new值。(代码
  3. 删除hash字段,并设置hash_new为主键。(代码
  4. 重命名hash_new为hash。(代码
    同时,为了数据安全,写了一个backup_db函数,在执行迁移前,拷贝一份sqlite数据库做备份

好了,万事俱备,只欠东风。
本地运行测试,一切正常。
服务器dev环境运行测试,一切正常。
demo环境运行测试,一切正常。
个人使用的生产环境运行测试,一切正常。

在我以为肯定还有什么问题的时候,它运行正常了。。。。
我怀着忐忑不安的心,将新的docker镜像push了。。。。

接下来就是等待迎接新的Bug了。

PS:我相信我的方案肯定不是最完美的,如果你有更好的方案,欢迎留言联系我哈,感激不尽!


2024-05-18 08:46:58 UTC

2024-05-18

纪念,今天是我第一次徒手写Django的migration文件!


2024-05-17 12:33:30 UTC

2024-05-17

之前推特在严格模式下是可以开的,现在不行了,估计需要更多数据来训练AI。。。
至于图2中所说的像往常一样保持一致,有种此地无银三百两的感觉。。。。

2024-05-17-12-33-30-utc-26070ab1.png 67.2 KB
2024-05-17-12-33-30-utc-cd5e49b6.png 48.2 KB


2024-05-17 06:58:57 UTC

2024-05-17

看到LDAPjs的维护者因为一封恶意邮件而归档了项目,深感惋惜,开源环境正在变的越来越差,我们该做些什么呢?
还有一篇 这个项目还在维护吗?


2024-05-17 06:12:23 UTC

2024-05-17

很喜欢NetBSD的主页设计和颜色搭配


2024-05-16 07:35:18 UTC

2024-05-16

在我Debug了2天后,才发现Django的ModelAdmin模块中, list_filter也会调用get_search_results函数来查询数据库
因此如果你设置了list_filter列表,但在admin页面上并没有成功筛选
可能是你自定义了get_search_results函数,对筛选结果进行了额外的操作。