asyncio+aiohttp异步IO的简单使用

项目中需要用到Python里的Asyncio模块去异步发生HTTP请求,做一个简单的demo记录。

Asyncio的基础知识可以参考官网文档,建议直接阅读3.7+版本的文档,之前的都太老了,网上搜的例子也很多都是3.7-的,还用的低级API。

demo功能:通过同步、异步两种方式循环调用百度地图API,观察两种方式IO耗时。

一、准备数据源

ak是百度地图API的秘钥,这里准备了3个待查询的地址。

def prepare_data():
    url = "http://api.map.baidu.com/geocoding/v3/"
    ak = "23ZRXDWDGmDRwnOfANHzB7v0hd5Hk6VC"
    params = [
        {
            "address": "北京市海淀区西土城路10号北京邮电大学",
            "output": "json",
            "ak": ak
        },
        {
            "address": "北京市西城区西长安街2号国家大剧院",
            "output": "json",
            "ak": ak
        },
        {
            "address": "北京市东城区钱粮胡同3号东城区人民政府",
            "output": "json",
            "ak": ak
        },
    ]
    return url, params

二、同步IO

sync_do_tasks去遍历3个查询地址,sync_get_postion则通过request模块去发送get请求。

def sync_get_position(url, params):
    res = requests.get(url, params=params)
    data = json.loads(res.text)
    if data['status'] == 0:
        return data['result']
    else:
        return None


def sync_do_tasks():
    url, params = prepare_data()
    results = [sync_get_position(url, p) for p in params]
    for r in results:
        print(r)

三、异步IO

同理,async_do_tasks组装tasks任务,通过gather方法并发执行。

async_get_postion则通过aiohttp模块去实现异步HTTP请求。

async def async_get_position(url, params):
    async with aiohttp.ClientSession() as session:
        async with session.get(url, params=params) as resp:
            return await resp.text()


async def async_do_tasks():
    url, params = prepare_data()
    tasks = [async_get_position(url, p) for p in params]
    response = await asyncio.gather(*tasks)
    for r in response:
        print(r)

四、main

在While循环中分别调用同步、异步代码,并计时。其中异步通过asyncio.run()作为入口。

def now():
    return time.time()
    
def main():
    loggers.init()
    logs = loggers.get(__name__)

    while True:
        try:
            logs.debug("start do sync tasks")
            print("sync task")
            sync_start = now()
            sync_do_tasks()
            sync_time = now() - sync_start
            print(sync_time)
            logs.debug(f"success to do sync tasks in {sync_time} seconds")

            logs.debug("start do async tasks")
            print("async task")
            async_start = now()
            asyncio.run(async_do_tasks())
            async_time = now() - async_start
            print(async_time)
            logs.debug(f"success to do async tasks in {async_time} seconds")
        except Exception as e:
            logs.debug("failed to do tasks")
        finally:
            time.sleep(5)

效果如下:同步、异步两种方式都成功获得response信息,异步耗时小于同步,效果还是比较明显的。

1619420669008.jpg

全部代码如下:

import aiohttp
import asyncio
import json
import requests
import time

import loggers


def now():
    return time.time()


def prepare_data():
    url = "http://api.map.baidu.com/geocoding/v3/"
    ak = "23ZRXDWDGmDRwnOfANHzB7v0hd5Hk6VC"
    params = [
        {
            "address": "北京市海淀区西土城路10号北京邮电大学",
            "output": "json",
            "ak": ak
        },
        {
            "address": "北京市西城区西长安街2号国家大剧院",
            "output": "json",
            "ak": ak
        },
        {
            "address": "北京市东城区钱粮胡同3号东城区人民政府",
            "output": "json",
            "ak": ak
        },
    ]
    return url, params


def sync_get_position(url, params):
    res = requests.get(url, params=params)
    data = json.loads(res.text)
    if data['status'] == 0:
        return data['result']
    else:
        return None


def sync_do_tasks():
    url, params = prepare_data()
    results = [sync_get_position(url, p) for p in params]
    for r in results:
        print(r)


async def async_get_position(url, params):
    async with aiohttp.ClientSession() as session:
        async with session.get(url, params=params) as resp:
            return await resp.text()


async def async_do_tasks():
    url, params = prepare_data()
    tasks = [async_get_position(url, p) for p in params]
    response = await asyncio.gather(*tasks)
    for r in response:
        print(r)


def main():
    loggers.init()
    logs = loggers.get(__name__)

    while True:
        try:
            logs.debug("start do sync tasks")
            print("sync task")
            sync_start = now()
            sync_do_tasks()
            sync_time = now() - sync_start
            print(sync_time)
            logs.debug(f"success to do sync tasks in {sync_time} seconds")

            logs.debug("start do async tasks")
            print("async task")
            async_start = now()
            asyncio.run(async_do_tasks())
            async_time = now() - async_start
            print(async_time)
            logs.debug(f"success to do async tasks in {async_time} seconds")
        except Exception as e:
            logs.debug("failed to do tasks")
        finally:
            time.sleep(5)


if __name__ == '__main__':
    main()
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容