Python进阶-异步处理

TOC

asyncio模块

asyncio模块可以实现Python的异步编程,允许你编写并发代码,以处理I/O密集型任务。这种方式相比于传统的多线程或多进程方式更加高效,特别是在需要处理大量I/O操作时。
基本概念:

  • 协程:协程是比线程轻量级的执行单元,通过 async 和 await 关键字定义。
  • 事件循环:事件循环是一个调度器,负责管理和调度协程的执行。

使用方法

# 导入包
import asyncio

# 主要方法
"""
async def       定义协程函数。
await           用于等待某个异步操作的完成。
asyncio.run()   运行主协程并管理事件循环。
asyncio.gather()  同时运行多个协程任务
asyncio.sleep()   停止等待,不会阻塞主线程
"""
# 定义异步协程
async def fetch_data(num):
    print(f"Fetching data {num}...")
    await asyncio.sleep(2)  # 模拟网络延迟
    print(f"Data {num} fetched")


async def main():
    # 创建多个协程任务
    tasks = [fetch_data(i) for i in range(5)]
    # 等待所有任务完成
    await asyncio.gather(*tasks)
    print('All tasks have been completed.')


# 运行事件循环
if __name__ == "__main__":
    asyncio.run(main())
# 输出结果:
"""
Fetching data 0...
Fetching data 1...
Fetching data 2...
Fetching data 3...
Fetching data 4...
Data 0 fetched
Data 1 fetched
Data 2 fetched
Data 3 fetched
Data 4 fetched
"""

所有的"Fetching data"消息几乎是同时打印的,表明它们是并发处理的,而"Data … fetched"则是任务完成后的结果。

多任务同步执行

多个异步任务同时进行。

import asyncio


# 定义异步协程任务
async def print_task(data, delay):
    count = 1
    for i in data:
        print(f'进度: {count}/{len(data)} executing {i} task...')
        count += 1
        await asyncio.sleep(int(delay))


# 定义多个异步任务运行
async def main():
    test_list_a = ['download', 'install', 'clean']
    test_list_b = ['loading', 'generate', 'complete']
    await asyncio.gather(print_task(test_list_a, 1), print_task(test_list_b, 2))
    print('All tasks have been completed.')

asyncio.run(main())

主线程和异步任务同时进行

保证我在执行异步任务的同时我的主线程不受异步任务的影响,正常执行,不会被阻塞。

import threading
import time
import asyncio


# 定义异步任务方法
async def print_task(data, delay):
    count = 1
    for i in data:
        print(f'进度: {count}/{len(data)} executing {i} task...')
        count += 1
        await asyncio.sleep(int(delay))


# 定义异步执行多个任务方法
async def async_task():
    test_list_a = ['download', 'install', 'clean']
    test_list_b = ['loading', 'generate', 'complete']
    await asyncio.gather(print_task(test_list_a, 1), print_task(test_list_b, 2))
    print('All async tasks have been completed.')


# 运行事件循环
def run_async_task():
    asyncio.run(async_task())


# 定义主线程和异步任务
def main():
    # 创建子线程去执行异步任务
    thread = threading.Thread(target=run_async_task)
    thread.start()
    # 主线任务执行
    for _ in range(5):
        print("Main thread is running...")
        time.sleep(2)
    thread.join()


if __name__ == '__main__':
    main()
# 输出结果:
"""
Main thread is running...
进度: 1/3 executing download task...
进度: 1/3 executing loading task...
进度: 2/3 executing install task...
进度: 2/3 executing generate task...
进度: 3/3 executing clean task...
Main thread is running...
进度: 3/3 executing complete task...
Main thread is running...
All async tasks have been completed.
Main thread is running...
Main thread is running...
"""