索引
本节内容将分析Tornado内置的异步HTTP客户端, 包括源码和Tornado官方的爬虫demo. AsyncHTTPClient
与SimpleAsyncHTTPClient
两个类是异步客户端的应用层代码, 下面就来分析其中的源码, 更底层的代码分析留待日后填坑.
简介与说明
Tornado内置了一个没有外部依赖的基于tornado iostreams
的非阻塞 HTTP 1.1 客户端,
默认情况下, 为了限制挂起连接
的数量, 单个IOLoop
只有一个AsyncHTTPClient
实例. 当然也可以通过设置force_instance=True
来取消这个限制, 然而并不建议这么做. 默认情况下, 只有第一次调用下面这种方式初始化客户端时, 配置才会生效.
AsyncHTTPClient(max_clients=20, defaults=dict(user_agent="WtfAgent"))
原因在上面也提到了, AsyncHTTPClient 是"隐式复用"的, 除非你设置了force_instance
取消"单实例"限制, 所以更推荐下面这种配置方式
tornado.httpclient.AsyncHTTPClient.configure(None,
defaults=dict(
user_agent="MyUserAgent"
),
max_clients=20,
)
max_clients
是实例的最大同时请求数, 超过限制的请求将会被放入队列中, 需要注意的是, 在队列中等待的时间也是计入超时计时中的, 比方说设定的超时时间是2秒, 如果排队时间超过2秒 , 即便这个请求实际上并没有发出, 那么也会被认定为超时
源码分析
AsyncHTTPClient
主要看fetch
, 其内部的操作如下:
- 新建一个
future
- 统一请求格式, 如果输入的是
url
那么需要封装成HTTPRequest
- 为请求添加回调(具体操作是, 先为
future
添加回调,future
完成后执行自身回调,future
的回调再在ioloop
注册用户定义的回调) - 执行
fetch_impl
, 这个函数实际上调用的是AsyncHTTPClient
的子类SimpleAsyncHTTPClient
的方法,fetch_impl
完成后会执行future.set_result
的回调
class AsyncHTTPClient(Configurable):
def fetch(self, request, callback=None, raise_error=True, **kwargs):
# 实际上并不需要显式close, 除非用多实例, 一般情况不会关异步客户端
if self._closed:
raise RuntimeError("fetch() called on closed AsyncHTTPClient")
# 统一请求格式
if not isinstance(request, HTTPRequest):
request = HTTPRequest(url=request, **kwargs)
request.headers = httputil.HTTPHeaders(request.headers)
request = _RequestProxy(request, self.defaults)
future = TracebackFuture()
# 注册回调
if callback is not None:
callback = stack_context.wrap(callback)
def handle_future(future):
exc = future.exception()
if isinstance(exc, HTTPError) and exc.response is not None:
response = exc.response
elif exc is not None:
response = HTTPResponse(
request, 599, error=exc,
request_time=time.time() - request.start_time)
else:
response = future.result()
# 注册HTTP请求完成的回调
self.io_loop.add_callback(callback, response)
# 注册future的回调
future.add_done_callback(handle_future)
# fetch_impl完成的回调
def handle_response(response):
if raise_error and response.error:
future.set_exception(response.error)
else:
future.set_result(response)
# "发送"请求, 返回`future`
self.fetch_impl(request, handle_response)
return future
SimpleAsyncHTTPClient
客户端是以队列的形式管理HTTP请求的, 单个实例允许同时发起和处理的最大请求数(即max_clients
)默认是10, 其内部工作流程如下:
- 将新的HTTP请求加入队列
- 将请求放入"等待区", 判断当前"工作区"的请求数, 如果超出可处理的请求数则为其在
ioloop
注册超时事件(超时事件是将请求移出等待区, 并通过在ioloop
注册回调的方式抛出异常) - 处理"等待区"的请求, 先注销
ioloop
中相应的超时事件(如果有的话), 然后将请求放入"工作区", 接着发出异步HTTP请求, 请求包含了释放资源的回调, 回调会将请求移出"工作区", 并重复这一步骤, 继续处理"等待区"的请求, 直到"等待区"被清空.
class SimpleAsyncHTTPClient(AsyncHTTPClient):
def initialize(self, io_loop, max_clients=10,
hostname_mapping=None, max_buffer_size=104857600,
resolver=None, defaults=None, max_header_size=None,
max_body_size=None):
super(SimpleAsyncHTTPClient, self).initialize(io_loop,
defaults=defaults)
self.max_clients = max_clients
self.queue = collections.deque()
self.active = {}
self.waiting = {}
self.max_buffer_size = max_buffer_size
self.max_header_size = max_header_size
self.max_body_size = max_body_size
# TCPClient could create a Resolver for us, but we have to do it
# ourselves to support hostname_mapping.
if resolver:
self.resolver = resolver
self.own_resolver = False
else:
self.resolver = Resolver(io_loop=io_loop)
self.own_resolver = True
if hostname_mapping is not None:
self.resolver = OverrideResolver(resolver=self.resolver,
mapping=hostname_mapping)
self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)
def fetch_impl(self, request, callback):
# 将请求加入`collections.deque()`队列中
key = object()
self.queue.append((key, request, callback))
# 对于超出最大同时请求量的请求, 为其在ioloop注册超时事件,
if not len(self.active) < self.max_clients:
timeout_handle = self.io_loop.add_timeout(
self.io_loop.time() + min(request.connect_timeout,
request.request_timeout),
functools.partial(self._on_timeout, key))
else:
timeout_handle = None
# 在`waiting`(等待发出的请求)中添加这`整个请求事件`(包括请求本身, 请求回调, 超时处理)
self.waiting[key] = (request, callback, timeout_handle)
# 处理请求队列
self._process_queue()
if self.queue:
gen_log.debug("max_clients limit reached, request queued. "
"%d active, %d queued requests." % (
len(self.active), len(self.queue)))
def _process_queue(self):
with stack_context.NullContext():
# 只能同时处理不超过 `max_clients` 的请求数
while self.queue and len(self.active) < self.max_clients:
# 从队列中获取在`等待`的请求,
key, request, callback = self.queue.popleft()
if key not in self.waiting:
continue
# 注销超时事件(如果有)并退出`waiting`, 在`active`(正在处理的请求)中添加请求
self._remove_timeout(key)
self.active[key] = (request, callback)
# 构建释放资源的回调函数
release_callback = functools.partial(self._release_fetch, key)
# `真正`发送HTTP请求的操作
self._handle_request(request, release_callback, callback)
def _handle_request(self, request, release_callback, final_callback):
self._connection_class()(
self.io_loop, self, request, release_callback,
final_callback, self.max_buffer_size, self.tcp_client,
self.max_header_size, self.max_body_size)
def _release_fetch(self, key):
# 完成请求后释放资源, 退出`active`
del self.active[key]
self._process_queue()
def _remove_timeout(self, key):
# 注销超时事件(如果有)并退出`waiting`
if key in self.waiting:
request, callback, timeout_handle = self.waiting[key]
if timeout_handle is not None:
self.io_loop.remove_timeout(timeout_handle)
del self.waiting[key]
def _on_timeout(self, key):
# 退出`waiting`并注册超时响应回调
request, callback, timeout_handle = self.waiting[key]
self.queue.remove((key, request, callback))
timeout_response = HTTPResponse(
request, 599, error=HTTPError(599, "Timeout"),
request_time=self.io_loop.time() - request.start_time)
self.io_loop.add_callback(callback, timeout_response)
del self.waiting[key]
官方爬虫demo(源码地址)
这个爬虫来自Tornado源码附带的demo, 使用其内置的队列, 协程和异步HTTP客户端实现一个"全站"爬虫. 理解这个demo对于理解协程和异步客户端都有一定帮助.
先简单说明这个爬虫的工作原理:
- 创建一个队列, 用于存放等待爬取的网页url, 并将网站的"根url"放入队列中
- 开启10个
worker
(可以理解为10个异步客户端)爬网站, 并为这个爬虫任务设置一个超时时间, 如果超时将抛出异常终止任务 - 每个
worker
的工作流程大致为:- 从队列获取一个新的url加入到"工作区",
- 爬取新url对应的页面
- 获取新页面中的所有url
- 将本次已爬取的url加入"完成区", 移出"工作区"
- 按照规定筛选前面获取到的新url, 然后将筛选的结果加入队列
- 重复第一步, 直到队列清空
#!/usr/bin/env python
import time
from datetime import timedelta
try:
from HTMLParser import HTMLParser
from urlparse import urljoin, urldefrag
except ImportError:
from html.parser import HTMLParser
from urllib.parse import urljoin, urldefrag
from tornado import httpclient, gen, ioloop, queues
# 设定根url和最大worker数
base_url = 'http://www.tornadoweb.org/en/stable/'
concurrency = 10
@gen.coroutine
def get_links_from_url(url):
# 爬取网页内容并获取网页中所有去尾的(去除锚点"#"及其后续内容)url
try:
response = yield httpclient.AsyncHTTPClient().fetch(url)
print('fetched %s' % url)
html = response.body if isinstance(response.body, str) \
else response.body.decode()
urls = [urljoin(url, remove_fragment(new_url))
for new_url in get_links(html)]
except Exception as e:
print('Exception: %s %s' % (e, url))
raise gen.Return([])
raise gen.Return(urls)
def remove_fragment(url):
# 去除url中锚点"#"及其后续内容
pure_url, frag = urldefrag(url)
return pure_url
def get_links(html):
# 获取网页中所有<a>标签url
class URLSeeker(HTMLParser):
def __init__(self):
HTMLParser.__init__(self)
self.urls = []
def handle_starttag(self, tag, attrs):
href = dict(attrs).get('href')
if href and tag == 'a':
self.urls.append(href)
url_seeker = URLSeeker()
url_seeker.feed(html)
return url_seeker.urls
@gen.coroutine
def main():
# 创建url队列, "工作区" 和 "完成区"
q = queues.Queue()
start = time.time()
fetching, fetched = set(), set()
@gen.coroutine
def fetch_url():
# 从队列中获取新的url
current_url = yield q.get()
try:
# 当前url已经在爬取(即url在"工作区中"), 就退出
if current_url in fetching:
return
print('fetching %s' % current_url)
# 将url加入"工作区", 爬取内容, 完成后加入"完成区"
fetching.add(current_url)
urls = yield get_links_from_url(current_url)
fetched.add(current_url)
# 筛选新的url并将其加入队列
for new_url in urls:
# Only follow links beneath the base URL
if new_url.startswith(base_url):
yield q.put(new_url)
# 完成后调用`q.task_done`, 队列中的未完成任务计数将减1, 与`q.get`一一对应
# 为的是配合`q.join`, 当计数为0时, `q.join`会结束"等待"
finally:
q.task_done()
# worker
@gen.coroutine
def worker():
while True:
yield fetch_url()
# 为队列添加第一个url("任务")
q.put(base_url)
# 启动workers, 然后"等待"队列清空, 时间为300秒, 超时会抛出异常
# 完成后检查"url队列"和"完成区"是否一致, 最后输出任务耗时
for _ in range(concurrency):
worker()
# 这里使用了`.join`阻塞等待, 直到队列被清空才会恢复, 继续往下走
yield q.join(timeout=timedelta(seconds=300))
assert fetching == fetched
print('Done in %d seconds, fetched %s URLs.' % (
time.time() - start, len(fetched)))
if __name__ == '__main__':
import logging
logging.basicConfig()
# 启动任务
io_loop = ioloop.IOLoop.current()
io_loop.run_sync(main)
本节内容就是这些, 下节内容将通过两种方式实现的聊天室demo, 介绍WebSocket与长轮询在Tornado中的实现方法.
NEXT ===> Tornado应用笔记06-WebSocket与长轮询