Tornado是异步非阻塞Web框架,能抗住每秒可以处理数以千计的连接。背后使用了Epoll,是一个高性能Web框架。
但是在使用Tornado作为框架来做业务逻辑编写,就会发现,虽然能抗住数以千计的连接,Tornado只有一个主线程。而编写业务肯定需要有耗时的过程逻辑,比如数据库的操作。这个时候,单线程的Tornado就特别容易阻塞在耗时逻辑上。
要想很好的适用业务逻辑,我的设计是基于Tornado引入线程池,并且能方便操作数据库,对Sqlalchemy包装一下。目前这种设计经历了1年多的项目运行,稳定运行。特别适合高并发不是特别要求高的业务情况。
多线程装饰器
建立线程池很简单,编写装饰器 @thread_executor,这样就可以很轻易的挂载装饰器来把业务逻辑压入线程池中运行。
executor = ThreadPoolExecutor(8)
def thread_executor(fn):
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
future = executor.submit(fn, self, *args, **kwargs)
return future
return wrapper
多线程带Session数据库操作装饰器
sqlalchemy_session = sessionmaker(bind=engine)
def thread_db_session_executor(fn):
"""
fn 是一个方法, @thread_db_session_executor 装饰后,将自动转入线程池里面运行。
fn 的参数会自动追加一个session,为sqlalchemy的DB数据操作。 例如 原为fn(a,b) 会变成fn(a,b,session)
fn 运行完成后 session自动关闭,请不要再fn内关闭。
如果 fn 运行异常,数据库会自动回滚,fn返回值变成异常对象。
"""
@thread_executor
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
start_time = time.time()
try:
session = sqlalchemy_session()
new_fn = functools.partial(fn, session=session)
result = new_fn(self, *args, **kwargs)
except Exception as e:
try:
logging.exception(
"\n%s \n--function-->>\n%s -args: %s \n%s -kwargs: %s\n<<--function--\n"
% (str(e), fn.__name__, str(args), fn.__name__, str(kwargs))
)
session.rollback()
except Exception as e2:
pass
result = e
finally:
session.close()
return result
return wrapper
封装Tornado的RequestHandler(json版本)
class SimpleThreadDBSessionHandler(tornado.web.RequestHandler):
"""
SimpleThreadDBSessionHandler 实例化后,会在线程池里面获得一个线程,并且获得DB的一个session(操作数据库)。
重载 on_post, on_get 来实现业务,方法执行线程安全。执行完成后,线程自动回收,session自动关闭。
post request body 是json格式
post get response body 是json
返回结果 也是 json
"""
@tornado.web.asynchronous
@tornado.gen.coroutine
def post(self):
try:
post_body_data = self.measure_post_body()
result = yield self._on_post(post_body_data)
except Exception as e:
result = e
print e
if isinstance(result, Exception) or result is None:
exception_response_result = self.exception_response(result)
if exception_response_result is None:
raise result
else:
result = exception_response_result
self.write(self.measure_response_body(result))
self.finish()
@thread_db_session_executor
def _on_post(self, post_body_data, session):
return self.on_post(post_body_data, session)
def on_post(self, post_body_data, session):
"""
请重载此函数处理
"""
raise tornado.web.HTTPError(405)
def measure_post_body(self):
"""
处理 request body的数据 转成json格式字典
:return:
"""
return utils_json_string_to_dict(self.request.body.decode('utf-8'))
通过以上的编写,在编写业务逻辑的时候,使用就特别方便。
class TestHandler(SimpleThreadDBSessionHandler):
def on_post(self, post_body_data, session):
user = session.query(User).filter(User.id == 1).first()
return {
'user_name': user.nickname
}
业务逻辑(POST) 只要在def on_post(self, post_body_data, session)里面实现就可以了,
on_post 带有一个操作数据库的session,并且通过使用@thread_db_session_executor的,on_post的逻辑就放入线程池里面运行,完成以后再返回到主线程里面返回。
就算中间发生异常,异常也会被捕捉,数据库回滚。