本章继续starlette的源码阅读
websockets.py
为下一章endpoint.py
的铺垫
三个简单工具类
class WebSocketState(enum.Enum):
CONNECTING = 0
CONNECTED = 1
DISCONNECTED = 2
class WebSocketDisconnect(Exception):
def __init__(self, code: int = 1000) -> None:
self.code = code
class WebSocketClose:
def __init__(self, code: int = 1000) -> None:
self.code = code
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
await send({"type": "websocket.close", "code": self.code})
WebSocket类
主要处理逻辑在uvicorn那边,实际内容很少
class WebSocket(HTTPConnection):
"""
继承于HTTPConnection类,与之前的Request类同源
"""
def __init__(self, scope: Scope, receive: Receive, send: Send) -> None:
super().__init__(scope)
# ↑ assert scope["type"] in ("http", "websocket")
# self.scope = scope
assert scope["type"] == "websocket"
self._receive = receive
self._send = send
self.client_state = WebSocketState.CONNECTING
self.application_state = WebSocketState.CONNECTING
async def receive(self) -> Message:
"""
接收ASGI websocket消息,确保有效的状态转换.
"""
if self.client_state == WebSocketState.CONNECTING:
# 未建立连接,开始申请连接
message = await self._receive()
message_type = message["type"]
assert message_type == "websocket.connect"
self.client_state = WebSocketState.CONNECTED
return message
elif self.client_state == WebSocketState.CONNECTED:
# 已建立连接,接受"数据"或"断开连接信息"
message = await self._receive()
message_type = message["type"]
assert message_type in {"websocket.receive", "websocket.disconnect"}
if message_type == "websocket.disconnect":
self.client_state = WebSocketState.DISCONNECTED
# 置连接状态为断开
return message
else:
raise RuntimeError(
'Cannot call "receive" once a disconnect message has been received.'
)
# 一旦收到断开连接的消息,就不能调用“receive”
async def send(self, message: Message) -> None:
"""
发送ASGI websocket消息,确保有效的状态转换.
"""
if self.application_state == WebSocketState.CONNECTING:
# 未连接,发送"开启连接信息"或"关闭连接信息"
message_type = message["type"]
assert message_type in {"websocket.accept", "websocket.close"}
if message_type == "websocket.close":
self.application_state = WebSocketState.DISCONNECTED
else:
self.application_state = WebSocketState.CONNECTED
await self._send(message)
elif self.application_state == WebSocketState.CONNECTED:
# 已连接,发送"数据"或"关闭连接信息"
message_type = message["type"]
assert message_type in {"websocket.send", "websocket.close"}
if message_type == "websocket.close":
self.application_state = WebSocketState.DISCONNECTED
await self._send(message)
else:
raise RuntimeError('Cannot call "send" once a close message has been sent.')
# 不能调用“send”一旦关闭消息已发送
async def accept(self, subprotocol: str = None) -> None:
if self.client_state == WebSocketState.CONNECTING:
# 如果我们还没有看到'connect'信息,那么就先等待它.
await self.receive()
await self.send({"type": "websocket.accept", "subprotocol": subprotocol})
def _raise_on_disconnect(self, message: Message) -> None:
if message["type"] == "websocket.disconnect":
raise WebSocketDisconnect(message["code"])
async def receive_text(self) -> str:
assert self.application_state == WebSocketState.CONNECTED
message = await self.receive()
self._raise_on_disconnect(message)
# 判断是否已断开连接
return message["text"]
async def receive_bytes(self) -> bytes:
assert self.application_state == WebSocketState.CONNECTED
message = await self.receive()
self._raise_on_disconnect(message)
return message["bytes"]
async def receive_json(self, mode: str = "text") -> typing.Any:
assert mode in ["text", "binary"]
assert self.application_state == WebSocketState.CONNECTED
message = await self.receive()
self._raise_on_disconnect(message)
if mode == "text":
text = message["text"]
else:
text = message["bytes"].decode("utf-8")
return json.loads(text)
async def iter_text(self) -> typing.AsyncIterator[str]:
try:
while True:
yield await self.receive_text()
except WebSocketDisconnect:
pass
async def iter_bytes(self) -> typing.AsyncIterator[bytes]:
try:
while True:
yield await self.receive_bytes()
except WebSocketDisconnect:
pass
async def iter_json(self) -> typing.AsyncIterator[typing.Any]:
try:
while True:
yield await self.receive_json()
except WebSocketDisconnect:
pass
async def send_text(self, data: str) -> None:
await self.send({"type": "websocket.send", "text": data})
async def send_bytes(self, data: bytes) -> None:
await self.send({"type": "websocket.send", "bytes": data})
async def send_json(self, data: typing.Any, mode: str = "text") -> None:
assert mode in ["text", "binary"]
text = json.dumps(data)
if mode == "text":
await self.send({"type": "websocket.send", "text": text})
else:
await self.send({"type": "websocket.send", "bytes": text.encode("utf-8")})
async def close(self, code: int = 1000) -> None:
await self.send({"type": "websocket.close", "code": code})