协程的原理可以参考https://www.jianshu.com/p/e5d6b5d63654
这里我们编写一个简单的异步tcp_client包,来加深对协程的理解。
tcp连接在创建套接字后,根据网络的延迟,需要一定时间和对端建立tcp连接,在写入数据时,如果发送缓冲区满了,需要等待发送完毕,读取数据时,需要等待到数据到达时才可读。因此,connect、read、send等函数都必须异步处理。
import socket
import asyncio
from asyncio import Future
from typing import (
Awaitable,
)
class IOStream():
def __init__(self, socket: socket.socket) -> None:
self.socket = socket
self.socket.setblocking(False)
self.io_loop = asyncio.get_event_loop()
def connect(self, address: tuple) -> None:
future = Future()
try:
self.socket.connect(address)
except BlockingIOError:
pass
def _handle_connect():
self.io_loop.remove_writer(self.socket)
future.set_result(None)
self.io_loop.add_writer(self.socket, _handle_connect)
return future
def write(self, data: bytes) -> None:
future = Future()
def _handle_write():
self.io_loop.remove_writer(self.socket)
# 正常流程里,这里是要根据缓冲区大小来陆续send的,本对象只是模拟协程的工作,所以不处理细节
self.socket.sendall(data)
future.set_result(None)
self.io_loop.add_writer(self.socket, _handle_write)
return future
def read_bytes(self, num_bytes: int) -> Awaitable[bytes]:
future = Future()
def _handle_read():
self.io_loop.remove_reader(self.socket)
data = self.socket.recv(num_bytes)
future.set_result(data)
self.io_loop.add_reader(self.socket, _handle_read)
return future
def close(self) -> None:
if self.socket:
self.socket.close()
self.socket = None
编写个简单的函数测试
if __name__ == '__main__':
async def main():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
stream = IOStream(s)
await stream.connect(("localhost", 8000))
print("connected")
await stream.write(b"GET / HTTP/1.0\r\nHost: localhost\r\n\r\n")
print("writed")
body_data = await stream.read_bytes(1024)
print("readed")
print(body_data)
stream.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([main(), main()]))
查看控制台,可以看到多个tcp连接是同时发出