Python异步编程asyncio全解:8大核心知识点+实战案例,超高并发神器

Python异步编程asyncio是Python实现超高并发的核心解决方案,基于协程实现,比多线程更轻量、更高效,完美适配网络请求、接口调用、消息队列等 I/O 密集型场景。本文从零讲解asyncio的基础协程、事件循环、async/await语法、任务创建、并发控制,搭配实战案例,让你的程序轻松支撑万级并发。

Python异步编程asyncio 核心知识体系总览图 协程/事件循环/async/await/任务

图1:Python异步编程asyncio – 核心知识体系总览

一、Python异步编程asyncio:异步编程核心概念(必懂)

在asyncio体系中,异步编程的核心是非阻塞执行:程序在等待 I/O 操作(如网络请求、文件读写)时,不闲置CPU,而是去执行其他任务,大幅提升执行效率,这是asyncio的核心优势,也是区别于同步编程、多线程编程的关键。

  • 协程(Coroutine):轻量级执行单元,比线程更省资源,单进程可创建上万协程(asyncio核心执行单元)
  • async/await:Python3.5+ 引入的异步语法,简洁实现协程,替代老旧的 yield from(asyncio基础语法)
  • 事件循环(Event Loop):异步程序的核心,负责调度协程、管理任务、处理 I/O 回调(asyncio调度核心)
  • 任务(Task):对协程的封装,让协程可被事件循环调度、取消、获取结果
  • 未来对象(Future):表示异步操作的最终结果,Task 是 Future 的子类

asyncio 适用场景深度解析

asyncio并非万能,它的核心优势场景是纯 I/O 密集型任务,以下是最适合用asyncio的场景:

  • 网络接口调用:API请求、微服务调用、第三方接口对接,是asyncio最常用的场景
  • 异步爬虫:单进程万级并发,效率远超多线程爬虫,适合大规模数据采集
  • 消息队列消费:MQTT、RabbitMQ等消息队列的异步消费,高并发场景下性能拉满
  • 文件异步读写:大文件读写、批量文件处理,避免阻塞主线程
  • 定时任务调度:异步定时任务,无需多线程,轻量高效

不适用场景:CPU密集型任务(如数据计算、图像处理),asyncio无法发挥优势,建议用多进程。

asyncio 异步 vs 多线程 核心优势

  • 更轻量:协程切换由程序控制,无系统内核开销,线程切换有内核开销
  • 更高并发:单进程可轻松实现上万协程并发,线程过多会导致系统卡顿
  • 更简洁:async/await 语法同步式写法,无需处理锁和线程安全问题
  • 适用场景:纯 I/O 密集型,比多线程效率高 5-10 倍

Python异步编程asyncio 异步vs多线程核心优势对比图 轻量/并发/开销/适用场景

图2:Python异步编程asyncio – 异步vs多线程核心区别对比

二、Python异步编程asyncio:asyncio 基础语法 async/await 入门

asyncio的核心是 async 定义协程函数,await 挂起协程执行,等待异步操作完成,这是asyncio的基础语法。

1. 定义协程函数

import asyncio

# 用 async def 定义协程函数(asyncio标准写法)
async def hello():
    print("开始执行协程")
    # await 挂起,等待异步操作完成(此处用sleep模拟I/O等待)
    await asyncio.sleep(2)
    print("协程执行完成")

# 协程函数调用不会直接执行,返回协程对象
coro = hello()
print(coro)

2. 运行协程:启动事件循环

import asyncio

async def hello():
    print("开始执行协程")
    await asyncio.sleep(2)
    print("协程执行完成")

# 方式1:Python3.7+ 推荐写法,自动创建并运行事件循环
if __name__ == '__main__':
    asyncio.run(hello())

# 方式2:手动创建事件循环(低版本兼容)
# loop = asyncio.get_event_loop()
# loop.run_until_complete(hello())
# loop.close()

Python异步编程asyncio 事件循环调度协程流程示意图

图3:Python异步编程asyncio – 事件循环调度协程执行流程

三、Python异步编程asyncio:异步并发 创建多任务(核心)

单协程无并发效果,通过 asyncio.create_task() 封装协程为任务,实现多协程并发执行,这是asyncio的核心用法。

1. 基础并发:create_task 创建任务

import asyncio
import time

# 模拟I/O密集型任务:网络请求/接口调用
async def task(name, t):
    print(f"任务{name}开始,等待{t}秒")
    await asyncio.sleep(t)  # 异步sleep,不阻塞事件循环
    return f"任务{name}完成"

async def main():
    start = time.time()
    # 创建3个异步任务,立即加入事件循环调度
    t1 = asyncio.create_task(task("A", 2))
    t2 = asyncio.create_task(task("B", 2))
    t3 = asyncio.create_task(task("C", 2))
    
    # 等待所有任务完成,获取结果
    res1 = await t1
    res2 = await t2
    res3 = await t3
    
    print(res1, res2, res3)
    print(f"总耗时:{time.time()-start:.2f}秒")  # 并发执行,总耗时≈2秒

if __name__ == '__main__':
    asyncio.run(main())

关键说明:3个任务各需2秒,串行执行需6秒,异步并发仅需2秒,CPU在等待I/O时不断切换任务,无闲置时间,完美体现asyncio的并发优势。

2. 批量任务:gather 统一调度

当任务数量较多时,用 asyncio.gather() 批量管理任务,无需手动创建多个task,代码更简洁,是asyncio企业级写法。

import asyncio
import time

async def task(name, t):
    print(f"任务{name}开始,等待{t}秒")
    await asyncio.sleep(t)
    return f"任务{name}完成"

async def main():
    start = time.time()
    # 批量创建协程,gather自动封装为任务并并发执行
    results = await asyncio.gather(
        task("A", 2),
        task("B", 2),
        task("C", 2)
    )
    print(results)  # ['任务A完成', '任务B完成', '任务C完成']
    print(f"总耗时:{time.time()-start:.2f}秒")

if __name__ == '__main__':
    asyncio.run(main())

四、Python异步编程asyncio:异步进阶 任务的控制与管理

实际开发中需要对异步任务进行取消、超时控制、异常捕获,asyncio 提供了完善的API支持,是asyncio工程化的必备能力。

1. 任务取消:task.cancel()

import asyncio

async def long_task():
    print("长任务开始,需要10秒")
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print("长任务被取消")
        raise  # 必须抛出异常,让事件循环感知任务取消
    return "长任务完成"

async def main():
    task = asyncio.create_task(long_task())
    # 等待1秒后取消任务
    await asyncio.sleep(1)
    task.cancel()
    # 捕获任务取消异常
    try:
        await task
    except asyncio.CancelledError:
        print("主函数捕获:任务已取消")

if __name__ == '__main__':
    asyncio.run(main())

2. 超时控制:timeout_after(Python3.11+)

防止异步任务无限等待,用 asyncio.timeout_after() 设置超时时间,超时自动取消任务,是asyncio稳定性保障。

import asyncio

async def task():
    await asyncio.sleep(5)
    return "任务完成"

async def main():
    try:
        # 设置超时时间2秒
        async with asyncio.timeout_after(2):
            res = await task()
            print(res)
    except TimeoutError:
        print("任务超时!")

if __name__ == '__main__':
    asyncio.run(main())

五、Python异步编程asyncio:实战案例 异步爬虫(超高并发版)

爬虫是典型的 I/O 密集型场景,用 asyncio 结合异步请求库 aiohttp,可实现单进程万级并发,效率远超多线程爬虫,是asyncio的经典实战场景。

1. 安装异步请求库 aiohttp

pip install aiohttp

2. 异步爬虫完整代码

import asyncio
import aiohttp
import time

# 异步请求函数
async def fetch(session, url):
    try:
        async with session.get(url, timeout=5) as response:
            # 获取响应状态码和内容长度
            return f"URL:{url} | 状态码:{response.status} | 长度:{len(await response.read())}"
    except Exception as e:
        return f"URL:{url} | 异常:{str(e)}"

# 主爬虫协程
async def async_crawler(urls):
    # 创建异步会话池,复用连接,提升效率
    async with aiohttp.ClientSession() as session:
        # 批量创建请求任务
        tasks = [asyncio.create_task(fetch(session, url)) for url in urls]
        # 并发执行,获取所有结果
        results = await asyncio.gather(*tasks)
        for res in results:
            print(res)

if __name__ == '__main__':
    # 待爬取URL列表(模拟10个请求)
    urls = ["https://www.baidu.com", "https://www.qq.com"] * 5
    start = time.time()
    # 运行异步爬虫
    asyncio.run(async_crawler(urls))
    print(f"异步爬虫总耗时:{time.time()-start:.2f}秒")

效果对比:10个网络请求,同步爬虫需10+秒,多线程爬虫需3-5秒,异步爬虫仅需1秒左右,并发效率提升显著,完美体现asyncio的实战价值。

六、Python异步编程asyncio:asyncio 高频易错点(必看)

异步函数内调用同步阻塞代码

如 time.sleep()、requests.get(),会阻塞整个事件循环,导致asyncio失效,需替换为异步版本(asyncio.sleep()、aiohttp.get())

直接调用协程函数,未加入事件循环

async def 定义的函数调用后返回协程对象,必须通过 asyncio.run()/create_task() 才能执行,asyncio核心执行逻辑

非协程函数内使用 await 关键字

await 只能用在 async def 定义的协程函数内,普通函数无法使用,asyncio语法基础

七、Python异步编程asyncio:高级技巧 信号量控制并发数

高并发场景下,无限制创建任务会导致服务器/接口被压垮,用 asyncio.Semaphore 设置最大并发数,实现流量控制,是asyncio工程化必备技巧。

import asyncio
import aiohttp

# 定义信号量,限制最大并发数为3
sem = asyncio.Semaphore(3)

async def fetch(session, url):
    # 申请信号量,超出并发数则等待
    async with sem:
        async with session.get(url) as response:
            return f"{url} | {response.status}"

async def main():
    urls = ["https://www.baidu.com"] * 10
    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.create_task(fetch(session, url)) for url in urls]
        results = await asyncio.gather(*tasks)
        print(results)

if __name__ == '__main__':
    asyncio.run(main())

Python异步编程asyncio 延伸学习推荐

深入学习可参考Python官方asyncio模块文档
学完asyncio,可回顾Python多线程与多进程Python模块与包,实现Python并发体系全覆盖。

八、Python异步编程asyncio:核心知识点总结

  • 核心语法async def 定义协程,await 挂起执行,asyncio.run() 启动事件循环(asyncio基础)
  • 并发实现create_task() 封装任务,gather() 批量调度,是异步并发的核心
  • 核心优势:协程轻量、无内核切换开销,单进程支持上万并发,适配纯 I/O 密集型场景
  • 关键控制Semaphore 控并发,timeout_after 控超时,task.cancel() 取消任务
  • 避坑要点:异步内禁用同步阻塞代码,await 只能用在协程函数内,CPU 密集型不单独用 asyncio
  • asyncio是超高并发神器,熟练使用可大幅提升开发效率,适配接口、爬虫、MQ等所有I/O密集场景
本文为「小白编程笔记」原创 · Python 异步编程 asyncio 全解
系列完结:模块包 → 正则 → 多线程多进程 → 异步编程,Python 并发体系全覆盖!

发表评论

滚动至顶部
渝公网安备50022402001073号  |  渝ICP备2026004448号   © 2026 小白编程笔记