应用场景
我目前开发一个django项目,前段时间接触到一个需求,需要做大量耗时任务的异步处理,之前这个功能是用threading直接做的,众所周知,threading做异步,一旦服务挂掉或者重启,任务会全部丢失,所以需要其他解决办法
为什么用rq
本来,这个需求直指消息队列,所以我打算用RabbitMQ+Celery去做,正准备操起键盘就是干,一通梭,正此时,前端同事凑过来跟我说,朋友,你有没有听过rq?
rq?所谓有啥不会,百度全对,于是我冲进rq官方文档。
RQ(Redis Queue),人如其名,用 redis 做的队列任务
redis ,众所周知, 它的列表可以做队列,rq就是把job放进队列里,然后启worker挨个做完
另外rq极其简单,官方文档短小精悍,容易上手
手把手教你Django_rq
安装
直接 pip install django-rq
会自动安装依赖库 rq
配置
setting.py
#引入django_rq,使得python manage.py 命令可以使用
INSTALLED_APPS = [
# other apps
"django_rq",
]
#配置队列
RQ_QUEUES = {
"default": {
"HOST":"127.0.0.1",
"PORT":6379,
"DB":1,
"URL": os.getenv(
"REDISTOGO_URL", "redis://127.0.0.1:6379/1"
), # If you're on Heroku
},
"high": {
"HOST":"127.0.0.1",
"PORT":6379,
"DB":1,
"URL": os.getenv(
"REDISTOGO_URL", "redis://127.0.0.1:6379/1"
), # If you're on Heroku
},
"low": {
"HOST":"127.0.0.1",
"PORT":6379,
"DB":1,
"URL": os.getenv(
"REDISTOGO_URL", "redis://127.0.0.1:6379/1"
), # If you're on Heroku
},
}
#default, high, low 表示队列的优先级,如果同时入队,优先执行high队列的,其次执行default队列的,最后执行low
# 但是如果一大批low队列的job在执行的话,此时high队列开始入队job,不会马上下一个任务就开始执行high队列,而是会继续执行low队列,直至low队列任务执行完毕
# "HOST":"127.0.0.1", "PORT":6379,"DB":1, 是在配置redis数据库
#还有其他参数可以配置,具体可以看看django_rq官方文档,附上源码和文档https://gitee.com/mirrors/django-rq?_from=gitee_search
urls.py
urlpatterns += [
path(r'admin/django-rq/', include('django_rq.urls')),
]
#把 rq 队列数据详情放入django admin后台管理中,如下图
开始撸代码
import django_rq
salt = uuid.uuid1()
rq_id = datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S") + salt
#job_id可以自己制定也能自动生成,但一定注意不能重名,否则重名的任务会覆盖
queue = django_rq.get_queue(
"default", autocommit=True, is_async=True, default_timeout=3600
)
#拿到队列,进入之前setting.py设置的"default"队列,异步设置为True(默认的,可以不设置),default_timeout = 3600(任务超时为3600s,按照队列该任务开始执行开始算起)
queue.enqueue(
"apps.xxx.xxx.xxxx.views.task", args=(task_arg1, task_arg2), result_ttl=31536000, job_id=rq_id
)
#入队,apps.xxx.xxx.xxxx.views是文件名称,task是函数名称,函数必须是独立的函数,否则在类中定义的话无法识别,也可以直接引入就在当前文件的函数,不必加引号
#args=(task_arg1, task_arg2)存放的是task的参数,参数可以为多个,但必须是基础类型,不能是对象,类,
# result_ttl=31536000是指执行完毕之后结果在redis数据库中保留的时间
#还有其他的参数,大家可以自己看看rq文档http://python-rq.org/
#django_rq还支持 @job来自动入队
执行任务
python manage.py rqworker high default low
#在 high default low三个队列各自启动一个worker,注意了,由于django_rq调用linux中fork(),所以只能在linux系统中执行,windos可以尝试win10的子系统
线上环境启动
ubuntu为例
写systemctl 启动配置
sudo vi /etc/systemd/system/rqworker.service
内容
[Unit]
Description=Django-RQ Worker
After=network.target
[Service]
WorkingDirectory=<<path_to_your_project_folder>>
ExecStart=/home/ubuntu/.virtualenv/<<your_virtualenv>>/bin/python \
<<path_to_your_project_folder>>/manage.py \
rqworker high default low
[Install]
WantedBy=multi-user.target
执行命令
sudo systemctl enable rqworker
sudo systemctl start rqworker
注意,无论那种方式只是启动了一个work,想要启动多个work 需要多线程执行 python manage.py rqworker high default low
大家可以自己写脚本实现,可以参考这篇文章http://xiaorui.cc/archives/2334,可以使用supervisord 来管理进程
查看队列执行状况
多种办法
1.配置好的django admin中查看,种类齐全,最佳查看方式
2.python manage.py rqstats
python manage.py rqstats --interval=1 #每秒刷新(其实刷新并不及时)
python manage.py rqstats --json # 输出JSON
python manage.py rqstats --yaml # 输出YAML
3. 进入redis库中可以看到自己的队列,worker,以及job(作为辅助验证使用)
4.rq 也有命令可以启动 和查看状态,大家可以自己看看
其他
worker其实也可以自己手写启动脚本,很多参数都可以自己配置,如果看到django_rq的源码就会发现django_rq就是简单集成django,功能基本都是调用rq实现,甚至rq都可以自己重构,添加想要的功能,工具在于使用,不必完全遵循。