关键字:Redis NoSQL Flask RQ Flask_Cache cache
非关系型数据库 内存型数据库 RQ队列 缓存
之前上线了的一个应用 -- Framework7+Vue+Flask开发实战 - PT保种管理系统1 - 概述,由于经常要在海量数据中统计、查询,随着数据逐渐增多,每次数据库访问等待时间从几百ms,增长到10几秒!
所以,必须引入Cache机制。
目光转向目前 流行的Redis --> 内存型NoSQL数据库
结果比较:速度提升167倍!
测试程序:
from timeit import timeit
timeit(stmt="[Ob.query.get(i) for i in range(10000)]", setup="from __main__ import Ob", number=10)
# PostgreSQL 25.15s
timeit(stmt="[r.get(i) for i in range(10000)]", setup="from __main__ import r", number=10)
# Redis 9.32s
timeit(stmt="[pipe.get(i) for i in range(10000)]; pipe.execute()", setup="from __main__ import r; pipe=r.pipeline(False)", number=100)
# Redis Pipe 0.15s
读取速度:62倍 167倍
timeit(stmt="[Cookie.query.filter_by(key='sys_status').update({'val':i}) for i in range(10000)]", setup="from app.models import Cookie", number=1)
# 20s
timeit(stmt="[r.set('tmp:%d'%i, i) for i in range(10000)]", setup="from __main__ import r;", number=10)
# 9.17s
timeit(stmt="[pipe.set('tmp:%d'%i, i) for i in range(10000)]; pipe.execute()", setup="from __main__ import r; pipe=r.pipeline(False)", number=10)
# 0.21s
写入速度:43倍 95倍
以上是本地测试数据。如果你使用SaaS网上托管主机,差距更加明显!因为它们的数据库一般不在本地,像Heroku,数据库在AWS,网络耗时又增加不少(慢2倍以上)。
如何在我们的框架中引入Redis:
之前我们的Flask后台框架:Vue 2.0 起步(4) 轻量级后端Flask用户认证 - 微信公众号RSS
引入RQ,参考:https://beenje.github.io/blog/posts/running-background-tasks-with-flask-and-rq/
# /app/main/views.py
import redis
from rq import push_connection, pop_connection, Queue
from rq.job import Job
from . import tasks, ob_api
def get_redis_connection():
redis_connection = getattr(g, '_redis_connection', None)
if redis_connection is None:
redis_url = current_app.config['REDIS_URL']
redis_connection = g._redis_connection = redis.from_url(redis_url)
return redis_connection
@main.before_request
def push_rq_connection():
push_connection(get_redis_connection())
app_status = r.hget('status', 'app')
if app_status and app_status.decode('ascii')=='idle':
q = Queue()
task_list = ['ob_sync', 'db2redis', 'ob_seeding_sync' ]
if (r.hget('status', 'tasks:ob_sync') is None) or (q.fetch_job(r.hget('status', 'tasks:ob_sync').decode('ascii')) is None): # 同步OB 种子
job = q.enqueue_call(func=tasks.ob_sync, args=('all',), timeout=3600, result_ttl=5*3600) # 结果缓存6*3600
print('start queue: tasks.ob_sync...', job.get_id())
r.hset('status', 'app', 'ob_sync')
r.hset('status', 'tasks:ob_sync', job.get_id())
if (r.hget('status', 'tasks:db2redis') is None) or (q.fetch_job(r.hget('status', 'tasks:db2redis').decode('ascii')) is None): # 检查 db2redis有没有结果
# job2
pass
if (r.hget('status', 'tasks:ob_seeding_sync') is None) or (q.fetch_job(r.hget('status', 'tasks:ob_seeding_sync').decode('ascii')) is None): # 串行
# job3
pass
elif app_status:
q = Queue()
task_list = ['ob_sync', 'db2redis', 'ob_seeding_sync' ]
for task in task_list:
if r.hget('status', 'tasks:%s'%task):
job_key = r.hget('status', 'tasks:%s'%task).decode('ascii')
job = q.fetch_job(job_key)
if job:
print(task, job_key, job.status, job.result)
if job.status in ['finished', 'failed'] : r.hset('status', 'app', 'idle')
else: r.hset('status', 'app', task)
else:
r.hset('status', 'app', 'idle')
@main.teardown_request
def pop_rq_connection(exception=None):
pop_connection()
Flask_Cache
另外,对于一些经常访问且变化不大的路由(views),可以引入Flask_Cache 高速缓存:
配置文件引入Redis server:
# config.py
class Config:
REDISTOGO_URL = os.getenv('REDIS_URL', 'redis://localhost:6379')
app初始化时,引入flask_cache,flask_redis:
## /app/__init__.py
from flask_cache import Cache
from flask_redis import FlaskRedis
from urllib import parse
cache = Cache()
r = FlaskRedis()
def create_app(config_name):
parse.uses_netloc.append("redis")
redis_url = parse.urlparse(app.config['REDISTOGO_URL'])
cache.init_app(app, config={
'CACHE_TYPE': 'redis',
'CACHE_KEY_PREFIX': 'fcache',
'CACHE_REDIS_HOST': redis_url.hostname,
'CACHE_REDIS_PORT': redis_url.port,
'CACHE_REDIS_USERNAME': redis_url.username or '',
'CACHE_REDIS_PASSWORD': redis_url.password or '',
# 'CACHE_REDIS_URL': app.config['REDISTOGO_URL'],
})
r.init_app(app)
路由里就可以按需使用cache装饰器了:
# /app/main/views.py
import redis
from rq import push_connection, pop_connection, Queue
from rq.job import Job
from .. import db, admin, cache, r
@cache.memoize(timeout=20)
def query_db():
time.sleep(3)
r.set('query_db', time.ctime())
return time.ctime() # you must return something, otherwise Cache will not work
@main.route("/cache")
def cache_view():
app = current_app._get_current_object()
with app.app_context():
start = time.time()
query_db()
return "Results from DB in {:.2f}sec".format(time.time()-start)
@main.route('/api/ob_report', methods=['GET', 'POST'])
@login_required
@cache.cached(timeout=50)
def api_ob_report():
app = current_app._get_current_object()
rsp = ob_api.ob_report(app)
return jsonify(code=rsp['code'], msg=rsp['msg'], sys_status=rsp['sys_status'])
注:Redis只在Linux平台可用,如果你用Windows:
2017数据库排名: