不用写事件循环,所设置的函数会在相应的事件发生时被调用
方便写异步或同步代码
基础
consumer包含了一系列的对应消息类型的函数,把其中的 . 和 _ 相互转换
AsyncConsumer
def __init__(self, scope):
self.scope = scope
async def __call__(self, receive, send):
调用时,设置channel_layer,调用channel_layer的new_channel方法构造并设置channel_name,传入channel_name参数构建channel_receive
对于本类 异步 则send即为参数中传入的send
监听:
await await_many_dispatch(
[receive, self.channel_receive], self.dispatch
)
或
# 针对没有channel_layer的
await await_many_dispatch([receive], self.dispatch)
在dispatch函数中根据message["type"]找到类中相应的处理函数
SyncConsumer
相比异步的:
send函数用async_to_sync转换了
dispatch加了database_sync_to_async转换
通用类
AsyncHttpConsumer
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.body = []
使用http_request处理http.request事件,在接收了完整的body后调用handle
async def http_request(self, message):
if "body" in message:
self.body.append(message["body"])
if not message.get("more_body"):
try:
await self.handle(b"".join(self.body))
finally:
await self.disconnect()
raise StopConsumer()
另有http_disconnect处理http.disconnect事件
如想在连接关闭之际做出处理,可在disconnect函数中实现
关于发送响应,提供了send_response函数,
在其中调用send_headers,发送http.response.start消息
在其中调用send_body,发送http.response.body消息
AsyncWebsocketConsumer
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.groups is None:
self.groups = []
实现了websocket_connect
被调用时如果设置了groups会调用channel_layer.group_add
之后调用connect,调用accept
实现了websocket_receive
根据message中具体内容,调用receive,传入message["text"]或message["bytes"]
实现了websocket_disconnect
被调用时如果设置了groups会调用channel_layer.group_discard
之后调用disconnect
实现了send,用于发送消息
如果想自定义函数,可重载connect accept receive close
WebsocketConsumer
和上述异步类似,注意写法转换
关于异步 async
def await_many_dispatch(consumer_callables, dispatch):
"""
Given a set of consumer callables, awaits on them all and passes results
from them to the dispatch awaitable as they come in.
"""
# Start them all off as tasks
loop = asyncio.get_event_loop()
tasks = [
loop.create_task(consumer_callable())
for consumer_callable in consumer_callables
]
try:
while True:
# Wait for any of them to complete
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# Find the completed one(s), yield results, and replace them
for i, task in enumerate(tasks):
if task.done():
result = task.result()
await dispatch(result)
tasks[i] = asyncio.ensure_future(consumer_callablesi)
finally:
# Make sure we clean up tasks on exit
for task in tasks:
task.cancel()
try:
await task
except CancelledError:
pass