endpoints.py
关于cbv的源码,分http的cbv和ws的cbv两种
HTTPEndpoint类
class HTTPEndpoint:
"""
cbv,老朋友了。本身属于一个app。
可以看到原理是极其简单的
"""
def __init__(self, scope: Scope, receive: Receive, send: Send) -> None:
assert scope["type"] == "http"
self.scope = scope
self.receive = receive
self.send = send
def __await__(self) -> typing.Generator:
return self.dispatch().__await__()
async def dispatch(self) -> None:
# 分发请求
# 通常cbv会重写get post等方法,而请求传入到cbv实例中,究竟该调用哪个方法。
# 会在此进行派送
request = Request(self.scope, receive=self.receive)
handler_name = "get" if request.method == "HEAD" else request.method.lower()
# 如果是head方法就转换成get方法
handler = getattr(self, handler_name, self.method_not_allowed)
# 用getattr的方式找同名方法
is_async = asyncio.iscoroutinefunction(handler)
# 判断是否为协程,寻找相应调用方式
if is_async:
response = await handler(request)
else:
response = await run_in_threadpool(handler, request)
await response(self.scope, self.receive, self.send)
async def method_not_allowed(self, request: Request) -> Response:
# 如果我们在starlette应用中运行, 且抛出了一个错误
# 那么可配置的异常处理程序, 便可处理返回的response并触发一场
# 对于简单的ASGI程序来说, 只仅仅返回response即可
# routing.py里完全一致的原文,直接把翻译超过来了
if "app" in self.scope:
raise HTTPException(status_code=405)
return PlainTextResponse("Method Not Allowed", status_code=405)
# 判断报错方式
WebSocketEndpoint类
class WebSocketEndpoint:
encoding: typing.Optional[str] = None # 可能是 "text", "bytes", 或者 "json".
def __init__(self, scope: Scope, receive: Receive, send: Send) -> None:
assert scope["type"] == "websocket"
self.scope = scope
self.receive = receive
self.send = send
def __await__(self) -> typing.Generator:
return self.dispatch().__await__()
async def dispatch(self) -> None:
websocket = WebSocket(self.scope, receive=self.receive, send=self.send)
await self.on_connect(websocket)
# 执行重写的on_connect方法
# 建立ws连接
close_code = status.WS_1000_NORMAL_CLOSURE
try:
while True:
# 不断接受信息,直到断开连接
message = await websocket.receive()
if message["type"] == "websocket.receive":
data = await self.decode(websocket, message)
await self.on_receive(websocket, data)
# 执行重写的on_receive方法
elif message["type"] == "websocket.disconnect":
close_code = int(message.get("code", status.WS_1000_NORMAL_CLOSURE))
break
except Exception as exc:
close_code = status.WS_1011_INTERNAL_ERROR
raise exc from None
finally:
# 将发生异常或者断开连接的状态码,传给重写的on_disconnect方法
await self.on_disconnect(websocket, close_code)
async def decode(self, websocket: WebSocket, message: Message) -> typing.Any:
if self.encoding == "text":
if "text" not in message:
await websocket.close(code=status.WS_1003_UNSUPPORTED_DATA)
raise RuntimeError("Expected text websocket messages, but got bytes")
return message["text"]
elif self.encoding == "bytes":
if "bytes" not in message:
await websocket.close(code=status.WS_1003_UNSUPPORTED_DATA)
raise RuntimeError("Expected bytes websocket messages, but got text")
return message["bytes"]
elif self.encoding == "json":
if message.get("text") is not None:
text = message["text"]
else:
text = message["bytes"].decode("utf-8")
try:
return json.loads(text)
except json.decoder.JSONDecodeError:
await websocket.close(code=status.WS_1003_UNSUPPORTED_DATA)
raise RuntimeError("Malformed JSON data received.")
assert (
self.encoding is None
), f"Unsupported 'encoding' attribute {self.encoding}"
return message["text"] if message.get("text") else message["bytes"]
async def on_connect(self, websocket: WebSocket) -> None:
"""重写当websocket连接的方法"""
await websocket.accept()
async def on_receive(self, websocket: WebSocket, data: typing.Any) -> None:
"""重写当websocket收到消息的方法"""
async def on_disconnect(self, websocket: WebSocket, close_code: int) -> None:
"""重写当websocket断开连接的方法"""
做一个小统计
行数统计
做了下统计,
applications.py
141,endpoints.py
88,requests.py
206,responses.py
275,routing.py
516,websockets.py
118现在已经过了1344行,源码总共有4157行,还任重而道远啊。下一章决定啃最大一块
datastructures.py
。作为经验尚浅的初学者,对python高深境界的了解还远远不足。尚待努力。