Django3.1异步视图+aiomysql+channels实现小游戏

Django3.1异步视图+aiomysql+channels实现小游戏

最近有需求需要开发一款网页答题小游戏,实现实时对战的功能,首先想到使用tornado高并发异步框架去实现websocket,可是就是这个时候django3.1正式版发布了,说他来的早不如说他来得巧,既然方便强大的django支持异步视图了那为什么还要去花时间研究tornado,django3.x实现asgi接口自然可以实现websocket,但是考虑开发成本,最终还是选择使用channels实现websocket。考虑到公司业务,这里就只写websocket的实现吧。

项目依赖的主要环境

  1. python >= 3.7
  2. django >= 3.1
  3. channels >= 2.4
  4. aiomysql
  5. uvicorn

channels配置

1. 假设python和django环境已经就绪且项目已经初始化

2. 安装channels

pip install channels

3. 在app中注册channels

# settings.py
INSTALLED_APPS = (
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.sites',
    ...
    'channels',
)

4. 创建路由配置

在工程目录下创建routing.py(myproject/routing.py和settings.py同级)

# myproject/routing.py
from channels.auth import AuthMiddlewareStack
# 继承settings中的allow host
from channels.security.websocket import AllowedHostsOriginValidator
from channels.routing import ProtocolTypeRouter, URLRouter

application = ProtocolTypeRouter({
    # (http->django views is added by default)
    'websocket': AllowedHostsOriginValidator(
        AuthMiddlewareStack(  # 中间件
            URLRouter(
            # 这里配置websocket的路由
            )
        ),
    )
})

5. 配置路由指向并且启用通道层

通道层可以理解为为每一个ws连接做一个唯一映射存在全局,这样可以在django的任何地方使用通道层给指定的连接发送消息
使用通道层需要安装channels_redis:pip install channels_redis

# settings.py
ASGI_APPLICATION = 'myproject.routing.application'
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}

6. 创建websocket的app

1. 注册app
# settings.py
INSTALLED_APPS = (
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.sites',
    ...
    'app',
    'channels',
)
2. 在app中创建消费者文件(consumers.py)
# app/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
    
    # 建立连接的回调
    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_id']
        self.room_group_name = 'chat_%s' % self.room_id

        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

    # 断开连接的回调
    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    # 收到消息的回调
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # Send message to room group
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message
            }
        )

    # 发送消息指定的处理方法
    async def chat_message(self, event):
        message = event['message']

        # Send message to WebSocket
        await self.send(text_data=json.dumps({
            'message': message
        }))
3. 在app中创建websocket路由文件(routing.py)
1.创建路由文件
# app/routing.py
from django.urls import re_path

from answer_game import consumers

websocket_urlpatterns = [
    re_path(r'ws/game/controller/(?P<room_id>\w+)/(?P<user_id>\d+)$', consumers.GameController),
]
2. 创建自定义中间件
class WebSocketAuthMiddleware:
    def __init__(self, inner):
        # 存储通过的ASGI应用程序
        self.inner = inner

    def __call__(self, scope):

        # 关闭旧的数据库连接,以防止使用超时的连接
        close_old_connections()

        # 自定义校验逻辑,获取子协议内容
        protocol = dict(scope['headers']).get(b'sec-websocket-protocol', b'').decode()
        # 处理子协议
        # 这里塞进scope的值可以在消费者方法中直接获取
        # 关于scope的更多信息可以参考官方文档 https://channels.readthedocs.io/en/latest/topics/consumers.html?highlight=scope#scope
        accept = False if not protocol else True
        
        # 直接返回内部应用程序并让它继续运行
        return self.inner(dict(scope, accept=accept))
2. 注册路由,使用自定义中间件
# myproject/routing.py
# 这里是自定义中间件,也可以换成自带的auth中间件,因为本次使用了websocket子协议protocol,所以使用自定义中间件做安全校验
from costudy_answer_game.WebSocketAuthMiddleware import WebSocketAuthMiddleware
# 这里就是自带的auth中间件,没有用到
from channels.auth import AuthMiddlewareStack
# 继承settings中的allow host
from channels.security.websocket import AllowedHostsOriginValidator
from channels.routing import ProtocolTypeRouter, URLRouter
from app import routing

application = ProtocolTypeRouter({
    # (http->django views is added by default)
    'websocket': AllowedHostsOriginValidator(
        WebSocketAuthMiddleware(
            URLRouter(
                # 这里配置websocket的路由
                routing.websocket_urlpatterns
            )
        ),
    )
})

7. 修改消费者方法接受子协议

# app/consumers.ChatConsumer.connect
await self.accept('your_protocol')

8. 创建asgi文件

在工程目录下创建asgi.py

# myproject/asgi.py
import os
import django

from django.core.asgi import get_asgi_application
from channels.routing import get_default_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

application = get_asgi_application()

django.setup()
ws_application = get_default_application()

到这里websocket的配置就已经完成里


使用aiomysql

django3.1目前还不支持异步orm,虽然官方文档说明可以使用sync_to_async转换同步方法为异步方法从而使用同步orm,但是实际开发中会遇到很多复杂的聚合查询等,此时再使用这种方法会很麻烦,而且甚至使用不了(是我使用不了,可能是我使用方法不对),所以我选择了aiomysql,aiomysql的官方文档在这 https://aiomysql.readthedocs.io/en/latest/connection.html#

当然也可以使用异步orm框架tortoise-orm,附上文档连接 https://tortoise-orm.readthedocs.io/en/latest/
但是由于调研时间问题项目里还是使用了原生sql,而且我始终不愿意在django里使用其他的orm,这是我对django orm的尊重,期望django能更快的支持异步orm

1.安装aiomysql

pip install aiomysql

2. 配置aiomysql

# settings.py
class DBcontroller:
    db1_engine = None
    db2_engine = None
    isinstance = False

    def __new__(cls, *args, **kwargs):
        if cls.isinstance:  # 如果被实例化了
            return cls.isinstance  # 返回实例化对象
        print('connecting to database...')
        loop = asyncio.get_event_loop()
        asyncio.run_coroutine_threadsafe(DBcontroller.connect(), loop)
        cls.isinstance = object.__new__(cls)  # 否则实例化
        return cls.isinstance  # 返回实例化的对象

    @staticmethod
    async def connect():
        try:
            db1_aiomysql_config = dict(
                host=os.environ.get('MYSQLHOST', '127.0.0.1'),
                port=int(os.environ.get('MYSQLPORT', 3306)),
                user=os.environ.get('MYSQLUSER', 'root'),
                password=os.environ.get('MYSQLPWD'),
                maxsize=100,
                db='dbname1',
                # echo=True
            )
            db2_aiomysql_config = db1_aiomysql_config.copy()
            db2_aiomysql_config.update({'db': 'dbname2'})

            db1_engine = await aiomysql.create_pool(**study_aiomysql_config)
            db2_engine = await aiomysql.create_pool(**game_aiomysql_config)
            if db1_engine and db2_engine:
                DBcontroller.db1_engine = db1_engine
                DBcontroller.db2_engine = db2_engine
                DBcontroller.connectStatue = True
                print('connect to mysql success!')
            else:
                raise ("connect to mysql error ")
        except:
            print('connect error.', exc_info=True)

db = DBcontroller()

3. 使用

# app/views.py
from django.shortcuts import  HttpResponse
async def get_user_id_by_uid(request):
    uid = request.GET.get('uid')
    async with await db.study_engine.acquire() as coon:  # type: aiomysql.connection.Connection
        async with coon.cursor() as cur:  # type: aiomysql.cursors.Cursor
            sql = "SELECT nickname FROM user WHERE id=%s"
            await cur.execute(sql, uid)
            rel = await cur.fetchone()
            if rel:
                user_id, = rel
            else:
                user_id = 0
    db.study_engine.release(coon)
    rel = {"user_id":user_id}
    return HttpResponse(json.dumps(rel, ensure_ascii=False), content_type='application/json')

4. 封装使用

每一次执行sql都要写这么多代码,太麻烦了,那就封装一下吧
创建公共方法文件utils.py

class DBExecute(object):
    def __init__(self, sql: str, params=None, return_type='tuple'):
        """
        执行study数据库的sql
        :param sql: sql语句
        :param params: sql的参数
        :param return_type: 返回的数据类型
        """
        self.sql = sql
        self.params = params
        self.return_type = return_type

    async def fetchone(self) -> (tuple, dict):
        async with await db.study_engine.acquire() as conn:  # type: aiomysql.connection.Connection
            if self.return_type == 'dict':
                async with conn.cursor(aiomysql.DictCursor) as cur:
                    if self.params is None:
                        await cur.execute(self.sql)
                    else:
                        await cur.execute(self.sql, self.params)
                    rel = await cur.fetchone()  # type: dict
            else:
                async with conn.cursor() as cur:  # type: aiomysql.cursors.Cursor
                    if self.params is None:
                        await cur.execute(self.sql)
                    else:
                        await cur.execute(self.sql, self.params)
                    rel = await cur.fetchone()  # type: tuple
        db.study_engine.release(conn)
        return rel

    async def fetchall(self) -> list:
        async with await db.study_engine.acquire() as conn:  # type: aiomysql.connection.Connection
            if self.return_type == 'dict':
                async with conn.cursor(aiomysql.DictCursor) as cur:  # type: aiomysql.cursors.Cursor
                    if self.params is None:
                        await cur.execute(self.sql)
                    else:
                        await cur.execute(self.sql, self.params)
                    rel = await cur.fetchall()
            else:
                async with conn.cursor() as cur:  # type: aiomysql.cursors.Cursor
                    if self.params is None:
                        await cur.execute(self.sql)
                    else:
                        await cur.execute(self.sql, self.params)
                    rel = await cur.fetchall()
        db.study_engine.release(conn)
        return rel

这里只封装了一个db的执行方法,在settings.py中配置了两个db,这个是根据业务调整的
现在再看一下刚刚的视图函数可以怎么写

# app/views.py
from django.shortcuts import  HttpResponse
from common.utils import db
async def get_user_id_by_uid(request):
    uid = request.GET.get('uid')
    sql = "SELECT nickname FROM user WHERE id=%s"
    rel = await DBExecute(sql, uid).fetchone()
    if rel:
        user_id, = rel
    else:
        user_id = 0
    rel = {"user_id":user_id}
    return HttpResponse(json.dumps(rel, ensure_ascii=False), content_type='application/json')

启动项目

uvicorn --host 127.0.0.1 --port 8000 --workers 1 myproject.asgi:ws_application

总结

算了,不会总结,有问题就谷歌

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351